From 74075df695d482ad73d99ecd0954863f6b2840c8 Mon Sep 17 00:00:00 2001 From: Alexei Naberezhnov Date: Wed, 10 May 2023 10:20:31 -0700 Subject: [PATCH] [test] Fix and improve DaVinciClientBenchmark (#410) [test] Fix and improve DaVinciClient benchmark --- build.gradle | 3 +- .../venice/VeniceClusterInitializer.java | 48 ++---- internal/venice-test-common/build.gradle | 48 +++--- .../venice/endToEnd/DaVinciClientTest.java | 13 +- .../venice/endToEnd/DaVinciComputeTest.java | 92 +++++------ .../DaVinciLiveUpdateSuppressionTest.java | 11 +- .../venice/endToEnd/PushStatusStoreTest.java | 2 +- .../TestPushJobWithNativeReplication.java | 3 - .../integration/utils/ProcessWrapper.java | 4 +- .../utils/VeniceClusterCreateOptions.java | 32 ++++ .../utils/VeniceClusterWrapper.java | 115 ++++++-------- .../utils/VeniceClusterWrapperConstants.java | 7 + .../utils/VeniceControllerCreateOptions.java | 32 ++++ .../utils/VeniceControllerWrapper.java | 9 +- .../benchmark/DaVinciClientBenchmark.java | 150 +++++++++++------- .../src/jmh/resources/log4j2.properties | 30 ++++ .../com/linkedin/venice/utils/TestUtils.java | 12 -- 17 files changed, 350 insertions(+), 261 deletions(-) create mode 100644 internal/venice-test-common/src/jmh/resources/log4j2.properties diff --git a/build.gradle b/build.gradle index db6448efa1..5516270914 100644 --- a/build.gradle +++ b/build.gradle @@ -16,6 +16,7 @@ plugins { id 'com.github.spotbugs' version '4.8.0' apply false id 'org.gradle.test-retry' version '1.5.0' apply false id 'com.form.diff-coverage' version '0.9.5' apply false + id 'me.champeau.jmh' version '0.6.7' apply false } apply from: "$rootDir/gradle/helper/git.gradle" @@ -78,8 +79,6 @@ ext.libraries = [ javax: 'javax.servlet:javax.servlet-api:3.1.0', javaxActivation: 'com.sun.activation:javax.activation:1.2.0', jdom: 'org.jdom:jdom:1.1', - jmhCore: 'org.openjdk.jmh:jmh-core:1.28', - jmhGenerator: 'org.openjdk.jmh:jmh-generator-annprocess:1.28', jna: 'net.java.dev.jna:jna:4.5.1', jsr305: 'com.google.code.findbugs:jsr305:3.0.2', joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2', diff --git a/internal/venice-avro-compatibility-test/src/test/java/com/linkedin/venice/VeniceClusterInitializer.java b/internal/venice-avro-compatibility-test/src/test/java/com/linkedin/venice/VeniceClusterInitializer.java index ce8ffea89a..04a39bf18d 100644 --- a/internal/venice-avro-compatibility-test/src/test/java/com/linkedin/venice/VeniceClusterInitializer.java +++ b/internal/venice-avro-compatibility-test/src/test/java/com/linkedin/venice/VeniceClusterInitializer.java @@ -6,8 +6,6 @@ import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.controllerapi.ControllerClient; -import com.linkedin.venice.controllerapi.ControllerResponse; -import com.linkedin.venice.controllerapi.NewStoreResponse; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.exceptions.VeniceException; @@ -95,38 +93,26 @@ public VeniceClusterInitializer(String storeName, int routerPort) { this.storeName = storeName; // Create test store this.controllerClient = this.veniceCluster.getControllerClient(); - NewStoreResponse newStoreResponse = - controllerClient.createNewStore(storeName, "test_owner", KEY_SCHEMA_STR, VALUE_SCHEMA_STR); - if (newStoreResponse.isError()) { - throw new VeniceException( - "Failed to create the store: " + storeName + ", and the error: " + newStoreResponse.getError()); - } - TestUtils.createMetaSystemStore(controllerClient, storeName, Optional.of(LOGGER)); + TestUtils.assertCommand(controllerClient.createNewStore(storeName, "test_owner", KEY_SCHEMA_STR, VALUE_SCHEMA_STR)); + this.veniceCluster.createMetaSystemStore(storeName); // Enable read compute UpdateStoreQueryParams params = new UpdateStoreQueryParams(); params.setReadComputationEnabled(true); - ControllerResponse updateStoreResponse = controllerClient.updateStore(storeName, params); - if (updateStoreResponse.isError()) { - throw new VeniceException( - "Failed to update store: " + storeName + ", and the error: " + updateStoreResponse.getError()); - } - VersionCreationResponse newVersion = controllerClient.requestTopicForWrites( - storeName, - 10240000, - Version.PushType.BATCH, - Version.guidBasedDummyPushId(), - true, - false, - false, - Optional.empty(), - Optional.empty(), - Optional.empty(), - false, - -1); - if (newVersion.isError()) { - throw new VeniceException( - "Failed to create a new version for store: " + storeName + ", and error is: " + newVersion.getError()); - } + TestUtils.assertCommand(controllerClient.updateStore(storeName, params)); + VersionCreationResponse newVersion = TestUtils.assertCommand( + controllerClient.requestTopicForWrites( + storeName, + 10240000, + Version.PushType.BATCH, + Version.guidBasedDummyPushId(), + true, + false, + false, + Optional.empty(), + Optional.empty(), + Optional.empty(), + false, + -1)); this.pushVersion = newVersion.getVersion(); this.pushVersionTopic = newVersion.getKafkaTopic(); this.valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID; diff --git a/internal/venice-test-common/build.gradle b/internal/venice-test-common/build.gradle index 0d626c47d6..1307949aee 100644 --- a/internal/venice-test-common/build.gradle +++ b/internal/venice-test-common/build.gradle @@ -5,6 +5,29 @@ import java.nio.file.StandardCopyOption import java.nio.file.StandardOpenOption +apply { + plugin 'me.champeau.jmh' +} + +jmh { + fork = 3 + warmupForks = 1 + iterations = 10 + warmupIterations = 5 + timeUnit = 'ns' + resultFormat = 'json' + failOnError = true + includeTests = false + profilers = ['gc'] + benchmarkMode = ['sample'] + includes = ['DaVinciClientBenchmark'] + jvmArgs = ['-Xms4G', '-Xmx4G', '-Djmh.shutdownTimeout=0', '-Djmh.shutdownTimeout.step=0'] +} + +jmhJar { + zip64 = true +} + configurations { all { resolutionStrategy { @@ -16,16 +39,11 @@ configurations { exclude group: 'org.apache.kafka' exclude group: 'org.mortbay.jetty', module: 'servlet-api' } - jmhImplementation.extendsFrom testImplementation integrationTestImplementation.extendsFrom testImplementation integrationTestUtils } sourceSets { - jmh { - java.srcDirs = ['src/jmh/java'] - } - integrationTest { java.srcDirs = ['src/integrationtest/java'] resources.srcDirs = ['src/integrationtest/resources'] @@ -58,7 +76,9 @@ dependencies { implementation libraries.commonsCli implementation libraries.conscrypt implementation libraries.fastUtil - implementation libraries.hadoopCommon + implementation (libraries.hadoopCommon) { + exclude group: 'javax.servlet' + } implementation libraries.helix implementation libraries.httpAsyncClient implementation libraries.javax @@ -77,23 +97,12 @@ dependencies { } testImplementation project(':clients:venice-admin-tool') testImplementation project(':internal:alpini:common:alpini-common-base') - testImplementation project(":internal:venice-common").sourceSets.test.output - - jmhAnnotationProcessor libraries.jmhGenerator - jmhImplementation libraries.jmhCore + testImplementation project(':internal:venice-common').sourceSets.test.output + jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:' + jmh.jmhVersion.get() jmhImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils') } -task jmh(type: JavaExec, dependsOn: jmhClasses) { - main = 'org.openjdk.jmh.Main' - - // In order to run just one test from the command line, specify it here, and run ./gradlew internal:venice-test-common:jmh - // main = 'com.linkedin.venice.benchmark.ZstdDecompressionBenchmark' - - classpath = sourceSets.jmh.runtimeClasspath -} - def integrationTestConfigs = { mustRunAfter test classpath = sourceSets.integrationTest.runtimeClasspath @@ -326,7 +335,6 @@ ext { diffCoverageThreshold = 0.00 } - publishing { publications { "${project.name}" (MavenPublication) { diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 828d18b5a3..bc86d00aba 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -297,7 +297,7 @@ public void testObjectReuse(DaVinciConfig clientConfig) throws Exception { final GenericRecord value = new GenericData.Record(schema); value.put("number", 10); String storeName = cluster.createStore(KEY_COUNT, value); - cluster.useControllerClient(client -> TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER))); + cluster.createMetaSystemStore(storeName); String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) @@ -943,7 +943,7 @@ private void setupHybridStore(String storeName, Consumer paramsConsumer.accept(params); cluster.useControllerClient(client -> { client.createNewStore(storeName, "owner", DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA); - TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)); + cluster.createMetaSystemStore(storeName); client.updateStore(storeName, params); cluster.createVersion(storeName, DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA, Stream.of()); SystemProducer producer = IntegrationTestPushUtils.getSamzaProducer( @@ -995,11 +995,8 @@ private void setUpStore( paramsConsumer.accept(params); try (ControllerClient controllerClient = createStoreForJob(cluster, DEFAULT_KEY_SCHEMA, "\"string\"", vpjProperties)) { - TestUtils.createMetaSystemStore(controllerClient, storeName, Optional.of(LOGGER)); - ControllerResponse response = controllerClient.updateStore(storeName, params); - Assert.assertFalse(response.isError(), response.getError()); - - // Push data through VPJ. + cluster.createMetaSystemStore(storeName); + TestUtils.assertCommand(controllerClient.updateStore(storeName, params)); runVPJ(vpjProperties, 1, cluster); } } @@ -1015,7 +1012,7 @@ private static void runVPJ(Properties vpjProperties, int expectedVersionNumber, private String createStoreWithMetaSystemStore(int keyCount) throws Exception { String storeName = cluster.createStore(keyCount); - cluster.useControllerClient(client -> TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER))); + cluster.createMetaSystemStore(storeName); return storeName; } diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciComputeTest.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciComputeTest.java index 91fb8147b8..729e8141e0 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciComputeTest.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciComputeTest.java @@ -168,15 +168,14 @@ public void verifyPostConditions(Method method) { // @Test(timeOut = TEST_TIMEOUT * 2) public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception { final String storeName = Utils.getUniqueString("store"); - cluster.useControllerClient(client -> { - TestUtils.assertCommand( - client.createNewStore( - storeName, - getClass().getName(), - DEFAULT_KEY_SCHEMA, - VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD)); - TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)); - }); + cluster.useControllerClient( + client -> TestUtils.assertCommand( + client.createNewStore( + storeName, + getClass().getName(), + DEFAULT_KEY_SCHEMA, + VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD))); + cluster.createMetaSystemStore(storeName); VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); @@ -278,11 +277,12 @@ public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception { public void testReadComputeMissingField() throws Exception { // Create DaVinci store final String storeName = Utils.getUniqueString("store"); - cluster.useControllerClient(client -> { - TestUtils.assertCommand( - client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, VALUE_SCHEMA_FOR_COMPUTE)); - TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)); - }); + cluster.useControllerClient( + client -> TestUtils.assertCommand( + client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, VALUE_SCHEMA_FOR_COMPUTE)) + + ); + cluster.createMetaSystemStore(storeName); VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); @@ -411,11 +411,14 @@ public void testReadComputeMissingField() throws Exception { public void testReadComputeSwappedFields() throws Exception { // Create DaVinci store final String storeName = Utils.getUniqueString("store"); - cluster.useControllerClient(client -> { - TestUtils.assertCommand( - client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, VALUE_SCHEMA_FOR_COMPUTE_SWAPPED)); - TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)); - }); + cluster.useControllerClient( + client -> TestUtils.assertCommand( + client.createNewStore( + storeName, + getClass().getName(), + DEFAULT_KEY_SCHEMA, + VALUE_SCHEMA_FOR_COMPUTE_SWAPPED))); + cluster.createMetaSystemStore(storeName); VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); @@ -520,15 +523,14 @@ public void testReadComputeSwappedFields() throws Exception { public void testComputeStreamingExecute() throws ExecutionException, InterruptedException { // Setup Store final String storeName = Utils.getUniqueString("store"); - cluster.useControllerClient(client -> { - TestUtils.assertCommand( - client.createNewStore( - storeName, - getClass().getName(), - KEY_SCHEMA_STEAMING_COMPUTE, - VALUE_SCHEMA_STREAMING_COMPUTE)); - TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)); - }); + cluster.useControllerClient( + client -> TestUtils.assertCommand( + client.createNewStore( + storeName, + getClass().getName(), + KEY_SCHEMA_STEAMING_COMPUTE, + VALUE_SCHEMA_STREAMING_COMPUTE))); + cluster.createMetaSystemStore(storeName); VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); @@ -619,15 +621,14 @@ public void onCompletion(Optional exception) { @Test(timeOut = TEST_TIMEOUT) public void testPartialKeyLookupWithRocksDBBlockBasedTable() throws ExecutionException, InterruptedException { final String storeName = Utils.getUniqueString("store"); - cluster.useControllerClient(client -> { - TestUtils.assertCommand( - client.createNewStore( - storeName, - getClass().getName(), - KEY_SCHEMA_PARTIAL_KEY_LOOKUP, - VALUE_SCHEMA_FOR_COMPUTE)); - TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)); - }); + cluster.useControllerClient( + client -> TestUtils.assertCommand( + client.createNewStore( + storeName, + getClass().getName(), + KEY_SCHEMA_PARTIAL_KEY_LOOKUP, + VALUE_SCHEMA_FOR_COMPUTE))); + cluster.createMetaSystemStore(storeName); VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); @@ -720,15 +721,14 @@ public void onCompletion(Optional exception) { @Test(timeOut = TEST_TIMEOUT) public void testPartialKeyLookupWithRocksDBPlainTable() throws ExecutionException, InterruptedException { final String storeName = Utils.getUniqueString("store"); - cluster.useControllerClient(client -> { - TestUtils.assertCommand( - client.createNewStore( - storeName, - getClass().getName(), - KEY_SCHEMA_PARTIAL_KEY_LOOKUP, - VALUE_SCHEMA_FOR_COMPUTE)); - TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)); - }); + cluster.useControllerClient( + client -> TestUtils.assertCommand( + client.createNewStore( + storeName, + getClass().getName(), + KEY_SCHEMA_PARTIAL_KEY_LOOKUP, + VALUE_SCHEMA_FOR_COMPUTE))); + cluster.createMetaSystemStore(storeName); VersionCreationResponse newVersion = cluster.getNewVersion(storeName); String topic = newVersion.getKafkaTopic(); diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.java index 6f3ba7d509..68a7f80198 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/DaVinciLiveUpdateSuppressionTest.java @@ -14,7 +14,6 @@ import com.linkedin.davinci.client.DaVinciConfig; import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory; import com.linkedin.venice.D2.D2ClientUtils; -import com.linkedin.venice.controllerapi.NewStoreResponse; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.exceptions.VeniceException; @@ -93,13 +92,9 @@ public void verifyPostConditions(Method method) { public void testLiveUpdateSuppression(IngestionMode ingestionMode) throws Exception { final String storeName = Utils.getUniqueString("store"); cluster.useControllerClient(client -> { - NewStoreResponse response = - client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA); - if (response.isError()) { - throw new VeniceException(response.getError()); - } - TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)); - // Update to hybrid store + TestUtils.assertCommand( + client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA)); + cluster.createMetaSystemStore(storeName); client.updateStore( storeName, new UpdateStoreQueryParams().setHybridRewindSeconds(10).setHybridOffsetLagThreshold(10)); diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java index e4ee61a119..c6a649c8af 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java @@ -107,7 +107,7 @@ public void setUpStore() { String owner = "test"; // set up push status store TestUtils.assertCommand(controllerClient.createNewStore(storeName, owner, DEFAULT_KEY_SCHEMA, "\"string\"")); - TestUtils.createMetaSystemStore(controllerClient, storeName, Optional.of(LOGGER)); + cluster.createMetaSystemStore(storeName); TestUtils.assertCommand( controllerClient.updateStore( storeName, diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java index a87eca2c25..3ff71b951f 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java @@ -323,9 +323,6 @@ public void testNativeReplicationWithIngestionIsolationInDaVinci() throws Except Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress()); } - // Setup meta system store for Da Vinci usage. - TestUtils.createMetaSystemStore(parentControllerClient, storeName, Optional.of(LOGGER)); - // Test Da-vinci client is able to consume from NR region which is consuming remotely VeniceMultiClusterWrapper childDataCenter = childDatacenters.get(1); diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/ProcessWrapper.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/ProcessWrapper.java index 67c2bbe7a3..58556b8129 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/ProcessWrapper.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/ProcessWrapper.java @@ -53,9 +53,7 @@ public abstract class ProcessWrapper implements Closeable { } if (firstUselessElement > 0) { StackTraceElement[] prunedStackTraceElements = new StackTraceElement[firstUselessElement]; - for (int i = 0; i < prunedStackTraceElements.length; i++) { - prunedStackTraceElements[i] = stackTraceElements[i]; - } + System.arraycopy(stackTraceElements, 0, prunedStackTraceElements, 0, prunedStackTraceElements.length); this.constructionCallstack.setStackTrace(prunedStackTraceElements); } diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterCreateOptions.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterCreateOptions.java index 973be2e35d..7599503fa3 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterCreateOptions.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterCreateOptions.java @@ -1,7 +1,9 @@ package com.linkedin.venice.integration.utils; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_DELAYED_TO_REBALANCE_MS; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_MAX_NUMBER_OF_PARTITIONS; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_NUMBER_OF_CONTROLLERS; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_NUMBER_OF_PARTITIONS; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_NUMBER_OF_ROUTERS; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_NUMBER_OF_SERVERS; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARTITION_SIZE_BYTES; @@ -25,6 +27,8 @@ public class VeniceClusterCreateOptions { private final int numberOfRouters; private final int replicationFactor; private final int partitionSize; + private final int numberOfPartitions; + private final int maxNumberOfPartitions; private final int minActiveReplica; private final long rebalanceDelayMs; private final boolean standalone; @@ -49,6 +53,8 @@ private VeniceClusterCreateOptions(Builder builder) { this.numberOfRouters = builder.numberOfRouters; this.replicationFactor = builder.replicationFactor; this.partitionSize = builder.partitionSize; + this.numberOfPartitions = builder.numberOfPartitions; + this.maxNumberOfPartitions = builder.maxNumberOfPartitions; this.minActiveReplica = builder.minActiveReplica; this.rebalanceDelayMs = builder.rebalanceDelayMs; this.standalone = builder.standalone; @@ -100,6 +106,14 @@ public int getPartitionSize() { return partitionSize; } + public int getNumberOfPartitions() { + return numberOfPartitions; + } + + public int getMaxNumberOfPartitions() { + return maxNumberOfPartitions; + } + public int getMinActiveReplica() { return minActiveReplica; } @@ -182,6 +196,12 @@ public String toString() { .append("partitionSize:") .append(partitionSize) .append(", ") + .append("numberOfPartitions:") + .append(numberOfPartitions) + .append(", ") + .append("maxNumberOfPartitions:") + .append(maxNumberOfPartitions) + .append(", ") .append("minActiveReplica:") .append(minActiveReplica) .append(", ") @@ -233,6 +253,8 @@ public static class Builder { private int numberOfRouters = DEFAULT_NUMBER_OF_ROUTERS; private int replicationFactor = DEFAULT_REPLICATION_FACTOR; private int partitionSize = DEFAULT_PARTITION_SIZE_BYTES; + private int numberOfPartitions = DEFAULT_NUMBER_OF_PARTITIONS; + private int maxNumberOfPartitions = DEFAULT_MAX_NUMBER_OF_PARTITIONS; private int minActiveReplica; private long rebalanceDelayMs = DEFAULT_DELAYED_TO_REBALANCE_MS; private boolean standalone = true; // set to false for multi-cluster @@ -293,6 +315,16 @@ public Builder partitionSize(int partitionSize) { return this; } + public Builder numberOfPartitions(int numberOfPartitions) { + this.numberOfPartitions = numberOfPartitions; + return this; + } + + public Builder maxNumberOfPartitions(int maxNumberOfPartitions) { + this.maxNumberOfPartitions = maxNumberOfPartitions; + return this; + } + public Builder minActiveReplica(int minActiveReplica) { this.minActiveReplica = minActiveReplica; this.isMinActiveReplicaSet = true; diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java index 1df14654bb..4f529e561d 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java @@ -13,6 +13,7 @@ import com.github.luben.zstd.ZstdDictTrainer; import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controller.Admin; import com.linkedin.venice.controller.init.ClusterLeaderInitializationRoutine; @@ -95,23 +96,14 @@ public class VeniceClusterWrapper extends ProcessWrapper { public static final String FORKED_PROCESS_ZK_ADDRESS = "zkAddress"; public static final int NUM_RECORDS = 1_000_000; - private final String regionName; - private final String clusterName; - private final boolean standalone; + private final VeniceClusterCreateOptions options; private final ZkServerWrapper zkServerWrapper; private final PubSubBrokerWrapper pubSubBrokerWrapper; - - private final int defaultReplicaFactor; - private final int defaultPartitionSize; - private final long defaultDelayToRebalanceMS; - private final int defaultMinActiveReplica; private final Map veniceControllerWrappers; private final Map veniceServerWrappers; private final Map veniceRouterWrappers; private final LazyResettable controllerClient = LazyResettable.of(this::getControllerClient, ControllerClient::close); - private final boolean sslToStorageNodes; - private final boolean sslToKafka; private final Map clusterToD2; private final Map clusterToServerD2; @@ -136,38 +128,22 @@ public class VeniceClusterWrapper extends ProcessWrapper { new HashSet<>(Arrays.asList(hybridRequiredSystemStores)); VeniceClusterWrapper( - String regionName, - String clusterName, - boolean standalone, + VeniceClusterCreateOptions options, ZkServerWrapper zkServerWrapper, PubSubBrokerWrapper pubSubBrokerWrapper, Map veniceControllerWrappers, Map veniceServerWrappers, Map veniceRouterWrappers, - int defaultReplicaFactor, - int defaultPartitionSize, - long defaultDelayToRebalanceMS, - int mintActiveReplica, - boolean sslToStorageNodes, - boolean sslToKafka, Map clusterToD2, Map clusterToServerD2) { super(SERVICE_NAME, null); - this.regionName = regionName; - this.standalone = standalone; - this.clusterName = clusterName; + this.options = options; this.zkServerWrapper = zkServerWrapper; this.pubSubBrokerWrapper = pubSubBrokerWrapper; this.veniceControllerWrappers = veniceControllerWrappers; this.veniceServerWrappers = veniceServerWrappers; this.veniceRouterWrappers = veniceRouterWrappers; - this.defaultReplicaFactor = defaultReplicaFactor; - this.defaultPartitionSize = defaultPartitionSize; - this.defaultDelayToRebalanceMS = defaultDelayToRebalanceMS; - this.defaultMinActiveReplica = mintActiveReplica; - this.sslToStorageNodes = sslToStorageNodes; - this.sslToKafka = sslToKafka; this.clusterToD2 = clusterToD2; this.clusterToServerD2 = clusterToServerD2; } @@ -223,6 +199,8 @@ static ServiceProvider generateService(VeniceClusterCreate new VeniceControllerCreateOptions.Builder(options.getClusterName(), zkServerWrapper, pubSubBrokerWrapper) .replicationFactor(options.getReplicationFactor()) .partitionSize(options.getPartitionSize()) + .numberOfPartitions(options.getNumberOfPartitions()) + .maxNumberOfPartitions(options.getMaxNumberOfPartitions()) .rebalanceDelayMs(options.getRebalanceDelayMs()) .minActiveReplica(options.getMinActiveReplica()) .clusterToD2(clusterToD2) @@ -310,20 +288,12 @@ static ServiceProvider generateService(VeniceClusterCreate VeniceClusterWrapper veniceClusterWrapper = null; try { veniceClusterWrapper = new VeniceClusterWrapper( - options.getRegionName(), - options.getClusterName(), - options.isStandalone(), + options, finalZkServerWrapper, finalPubSubBrokerWrapper, veniceControllerWrappers, veniceServerWrappers, veniceRouterWrappers, - options.getReplicationFactor(), - options.getPartitionSize(), - options.getRebalanceDelayMs(), - options.getMinActiveReplica(), - options.isSslToStorageNodes(), - options.isSslToKafka(), clusterToD2, clusterToServerD2); // Wait for all the asynchronous ClusterLeaderInitializationRoutine to complete before returning the @@ -428,7 +398,7 @@ protected void internalStop() throws Exception { veniceRouterWrappers.values().forEach(Utils::closeQuietlyWithErrorLogged); veniceServerWrappers.values().forEach(Utils::closeQuietlyWithErrorLogged); veniceControllerWrappers.values().forEach(Utils::closeQuietlyWithErrorLogged); - if (standalone) { + if (options.isStandalone()) { Utils.closeQuietlyWithErrorLogged(pubSubBrokerWrapper); Utils.closeQuietlyWithErrorLogged(zkServerWrapper); } @@ -455,13 +425,14 @@ public int getPort() { @Override public String getComponentTagForLogging() { - return new StringBuilder(getComponentTagPrefix(regionName)).append(getComponentTagPrefix(getClusterName())) + return new StringBuilder(getComponentTagPrefix(options.getRegionName())) + .append(getComponentTagPrefix(getClusterName())) .append(getServiceName()) .toString(); } public String getClusterName() { - return clusterName; + return options.getClusterName(); } public ZkServerWrapper getZk() { @@ -527,24 +498,26 @@ public synchronized VeniceControllerWrapper getLeaderVeniceController(long timeo long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs); while (System.nanoTime() < deadline) { for (VeniceControllerWrapper controller: veniceControllerWrappers.values()) { - if (controller.isRunning() && controller.isLeaderController(clusterName)) { + if (controller.isRunning() && controller.isLeaderController(getClusterName())) { return controller; } } Utils.sleep(Time.MS_PER_SECOND); } - throw new VeniceException("Leader controller does not exist, cluster=" + clusterName); + throw new VeniceException("Leader controller does not exist, cluster=" + getClusterName()); } public VeniceControllerWrapper addVeniceController(Properties properties) { VeniceControllerWrapper veniceControllerWrapper = ServiceFactory.getVeniceController( - new VeniceControllerCreateOptions.Builder(clusterName, zkServerWrapper, pubSubBrokerWrapper) - .regionName(regionName) - .replicationFactor(defaultReplicaFactor) - .partitionSize(defaultPartitionSize) - .rebalanceDelayMs(defaultDelayToRebalanceMS) - .minActiveReplica(defaultMinActiveReplica) - .sslToKafka(sslToKafka) + new VeniceControllerCreateOptions.Builder(getClusterName(), zkServerWrapper, pubSubBrokerWrapper) + .regionName(options.getRegionName()) + .replicationFactor(options.getReplicationFactor()) + .partitionSize(options.getPartitionSize()) + .numberOfPartitions(options.getNumberOfPartitions()) + .maxNumberOfPartitions(options.getMaxNumberOfPartitions()) + .rebalanceDelayMs(options.getRebalanceDelayMs()) + .minActiveReplica(options.getMinActiveReplica()) + .sslToKafka(options.isSslToKafka()) .clusterToD2(clusterToD2) .clusterToServerD2(clusterToServerD2) .extraProperties(properties) @@ -565,11 +538,11 @@ public void addVeniceControllerWrapper(VeniceControllerWrapper veniceControllerW public VeniceRouterWrapper addVeniceRouter(Properties properties) { VeniceRouterWrapper veniceRouterWrapper = ServiceFactory.getVeniceRouter( - regionName, - clusterName, + options.getRegionName(), + getClusterName(), zkServerWrapper, pubSubBrokerWrapper, - sslToStorageNodes, + options.isSslToStorageNodes(), clusterToD2, clusterToServerD2, properties); @@ -591,13 +564,13 @@ public VeniceServerWrapper addVeniceServer(boolean enableAllowlist, boolean enab featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(enableAllowlist)); featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(enableAutoJoinAllowList)); VeniceServerWrapper veniceServerWrapper = ServiceFactory.getVeniceServer( - regionName, - clusterName, + options.getRegionName(), + getClusterName(), pubSubBrokerWrapper, zkServerWrapper.getAddress(), featureProperties, new Properties(), - clusterToServerD2.get(clusterName)); + clusterToServerD2.get(getClusterName())); synchronized (this) { veniceServerWrappers.put(veniceServerWrapper.getPort(), veniceServerWrapper); } @@ -612,13 +585,13 @@ public VeniceServerWrapper addVeniceServer(boolean enableAllowlist, boolean enab */ public VeniceServerWrapper addVeniceServer(Properties properties) { VeniceServerWrapper veniceServerWrapper = ServiceFactory.getVeniceServer( - regionName, - clusterName, + options.getRegionName(), + getClusterName(), pubSubBrokerWrapper, zkServerWrapper.getAddress(), new Properties(), properties, - clusterToServerD2.get(clusterName)); + clusterToServerD2.get(getClusterName())); synchronized (this) { veniceServerWrappers.put(veniceServerWrapper.getPort(), veniceServerWrapper); } @@ -627,13 +600,13 @@ public VeniceServerWrapper addVeniceServer(Properties properties) { public VeniceServerWrapper addVeniceServer(Properties featureProperties, Properties configProperties) { VeniceServerWrapper veniceServerWrapper = ServiceFactory.getVeniceServer( - regionName, - clusterName, + options.getRegionName(), + getClusterName(), pubSubBrokerWrapper, zkServerWrapper.getAddress(), featureProperties, configProperties, - clusterToServerD2.get(clusterName)); + clusterToServerD2.get(getClusterName())); synchronized (this) { veniceServerWrappers.put(veniceServerWrapper.getPort(), veniceServerWrapper); } @@ -694,7 +667,7 @@ public synchronized void removeVeniceRouter(int port) { public synchronized List stopVeniceServer(int port) { Admin admin = getLeaderVeniceController().getVeniceAdmin(); List effectedReplicas = - admin.getReplicasOfStorageNode(clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), port)); + admin.getReplicasOfStorageNode(getClusterName(), Utils.getHelixNodeIdentifier(Utils.getHostName(), port)); stopVeniceComponent(veniceServerWrappers, port); return effectedReplicas; } @@ -766,7 +739,7 @@ private T getRandomRunningVeniceComponent(Map controllerClientConsumer) { @@ -884,6 +857,12 @@ public ControllerResponse updateStore(String storeName, UpdateStoreQueryParams p return assertCommand(controllerClient.get().updateStore(storeName, params)); } + public void createMetaSystemStore(String storeName) { + String metaSystemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); + assertCommand(controllerClient.get().emptyPush(metaSystemStoreName, "createMetaSystemStore", 1L)); + waitVersion(metaSystemStoreName, 1); + } + public static final String DEFAULT_KEY_SCHEMA = "\"int\""; public static final String DEFAULT_VALUE_SCHEMA = "\"int\""; @@ -914,11 +893,15 @@ public String createStore(Stream batchData) { } public String createStore(int keyCount, GenericRecord record) { + return createStore(keyCount, record, CompressionStrategy.NO_OP); + } + + public String createStore(int keyCount, GenericRecord record, CompressionStrategy compressionStrategy) { return createStore( DEFAULT_KEY_SCHEMA, record.getSchema().toString(), IntStream.range(0, keyCount).mapToObj(i -> new AbstractMap.SimpleEntry<>(i, record)), - CompressionStrategy.NO_OP, + compressionStrategy, null); } @@ -1054,7 +1037,7 @@ public static void main(String[] args) throws IOException { TestWriteUtils.writeSimpleAvroFileWithCustomSize(inputDir, NUM_RECORDS, 10, 20); - try (ControllerClient client = new ControllerClient(veniceClusterWrapper.clusterName, controllerUrl)) { + try (ControllerClient client = new ControllerClient(veniceClusterWrapper.getClusterName(), controllerUrl)) { TestUtils.assertCommand(client.createNewStore(storeName, "ownerOf" + storeName, KEY_SCHEMA, VALUE_SCHEMA)); TestUtils.assertCommand( @@ -1082,7 +1065,7 @@ public static void main(String[] args) throws IOException { } public String getRegionName() { - return regionName; + return options.getRegionName(); } public Map getClusterToServerD2() { diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapperConstants.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapperConstants.java index 95094e5814..91347ef78b 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapperConstants.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapperConstants.java @@ -7,6 +7,13 @@ public class VeniceClusterWrapperConstants { public static final int DEFAULT_MAX_ATTEMPT = 10; public static final int DEFAULT_REPLICATION_FACTOR = 1; public static final int DEFAULT_PARTITION_SIZE_BYTES = 100; + /** + * Running with just one partition may not fully exercise the distributed nature of the system, + * but we do want to minimize the number as each partition results in files, connections, threads, etc. + * in the whole system. 3 seems like a reasonable tradeoff between these concerns. + */ + public static final int DEFAULT_NUMBER_OF_PARTITIONS = 1; + public static final int DEFAULT_MAX_NUMBER_OF_PARTITIONS = 3; // By default, disable the delayed rebalance for testing. public static final long DEFAULT_DELAYED_TO_REBALANCE_MS = 0; public static final boolean DEFAULT_SSL_TO_STORAGE_NODES = false; diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceControllerCreateOptions.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceControllerCreateOptions.java index 7c05ce597e..c1822766cf 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceControllerCreateOptions.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceControllerCreateOptions.java @@ -4,6 +4,8 @@ import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE; import static com.linkedin.venice.ConfigKeys.LOCAL_REGION_NAME; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_DELAYED_TO_REBALANCE_MS; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_MAX_NUMBER_OF_PARTITIONS; +import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_NUMBER_OF_PARTITIONS; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARTITION_SIZE_BYTES; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_REPLICATION_FACTOR; import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; @@ -22,6 +24,8 @@ public class VeniceControllerCreateOptions { private final boolean d2Enabled; private final int replicationFactor; private final int partitionSize; + private final int numberOfPartitions; + private final int maxNumberOfPartitions; private final int minActiveReplica; private final long rebalanceDelayMs; private final String[] clusterNames; @@ -39,6 +43,8 @@ private VeniceControllerCreateOptions(Builder builder) { d2Enabled = builder.d2Enabled; replicationFactor = builder.replicationFactor; partitionSize = builder.partitionSize; + numberOfPartitions = builder.numberOfPartitions; + maxNumberOfPartitions = builder.maxNumberOfPartitions; minActiveReplica = builder.minActiveReplica; rebalanceDelayMs = builder.rebalanceDelayMs; clusterNames = builder.clusterNames; @@ -70,6 +76,12 @@ public String toString() { .append("partitionSize:") .append(partitionSize) .append(", ") + .append("numberOfPartitions:") + .append(numberOfPartitions) + .append(", ") + .append("maxNumberOfPartitions:") + .append(maxNumberOfPartitions) + .append(", ") .append("minActiveReplica:") .append(minActiveReplica) .append(", ") @@ -132,6 +144,14 @@ public int getPartitionSize() { return partitionSize; } + public int getNumberOfPartitions() { + return numberOfPartitions; + } + + public int getMaxNumberOfPartitions() { + return maxNumberOfPartitions; + } + public int getMinActiveReplica() { return minActiveReplica; } @@ -185,6 +205,8 @@ public static class Builder { private boolean isMinActiveReplicaSet = false; private int replicationFactor = DEFAULT_REPLICATION_FACTOR; private int partitionSize = DEFAULT_PARTITION_SIZE_BYTES; + private int numberOfPartitions = DEFAULT_NUMBER_OF_PARTITIONS; + private int maxNumberOfPartitions = DEFAULT_MAX_NUMBER_OF_PARTITIONS; private int minActiveReplica; private long rebalanceDelayMs = DEFAULT_DELAYED_TO_REBALANCE_MS; private Map clusterToD2 = null; @@ -225,6 +247,16 @@ public Builder partitionSize(int partitionSize) { return this; } + public Builder numberOfPartitions(int numberOfPartitions) { + this.numberOfPartitions = numberOfPartitions; + return this; + } + + public Builder maxNumberOfPartitions(int maxNumberOfPartitions) { + this.maxNumberOfPartitions = maxNumberOfPartitions; + return this; + } + public Builder minActiveReplica(int minActiveReplica) { this.minActiveReplica = minActiveReplica; this.isMinActiveReplicaSet = true; diff --git a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java index f9a8c16810..824fda625c 100644 --- a/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java +++ b/internal/venice-test-common/src/integrationtest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java @@ -166,16 +166,11 @@ static StatefulServiceProvider generateService(VeniceCo .put(ADMIN_TOPIC_REPLICATION_FACTOR, 1) .put(CONTROLLER_NAME, "venice-controller") // Why is this configurable? .put(DEFAULT_REPLICA_FACTOR, options.getReplicationFactor()) - .put(DEFAULT_NUMBER_OF_PARTITION, 1) .put(ADMIN_PORT, adminPort) .put(ADMIN_SECURE_PORT, adminSecurePort) - /** - * Running with just one partition may not fully exercise the distributed nature of the system, - * but we do want to minimize the number as each partition results in files, connections, threads, etc. - * in the whole system. 3 seems like a reasonable tradeoff between these concerns. - */ - .put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 3) .put(DEFAULT_PARTITION_SIZE, options.getPartitionSize()) + .put(DEFAULT_NUMBER_OF_PARTITION, options.getNumberOfPartitions()) + .put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, options.getMaxNumberOfPartitions()) .put(CONTROLLER_PARENT_MODE, options.isParent()) .put(DELAY_TO_REBALANCE_MS, options.getRebalanceDelayMs()) .put(MIN_ACTIVE_REPLICA, options.getMinActiveReplica()) diff --git a/internal/venice-test-common/src/jmh/java/com/linkedin/venice/benchmark/DaVinciClientBenchmark.java b/internal/venice-test-common/src/jmh/java/com/linkedin/venice/benchmark/DaVinciClientBenchmark.java index b674a922da..65c56c2811 100644 --- a/internal/venice-test-common/src/jmh/java/com/linkedin/venice/benchmark/DaVinciClientBenchmark.java +++ b/internal/venice-test-common/src/jmh/java/com/linkedin/venice/benchmark/DaVinciClientBenchmark.java @@ -3,15 +3,19 @@ import static com.linkedin.venice.integration.utils.ServiceFactory.getGenericAvroDaVinciClient; import static com.linkedin.venice.integration.utils.ServiceFactory.getVeniceCluster; +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.davinci.client.DaVinciClient; -import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -34,30 +38,34 @@ import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.testng.Assert; -@Fork(value = 1, warmups = 1, jvmArgs = { "-Xms4G", "-Xmx4G" }) -@Warmup(iterations = 5) -@Measurement(iterations = 10) +@Fork(value = 1, jvmArgs = { "-Xms4G", "-Xmx4G", "-Djmh.shutdownTimeout=0", "-Djmh.shutdownTimeout.step=0" }) +@Warmup(iterations = 0) +@Measurement(iterations = 1) @BenchmarkMode(Mode.SampleTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Benchmark) public class DaVinciClientBenchmark { - protected static final int KEY_COUNT = 100_000; - protected static final String VALUE_FIELD = "value"; + @Param({ "1000000000" }) // 1GB + long dataSize; - @Param({ "DENSE_VECTOR" }) - protected String valueType; + @Param({ "10000" }) // 10KB + int valueSize; - @Param({ "2500" }) - protected int valueLength; + int keyCount; - protected VeniceClusterWrapper cluster; - protected DaVinciClient client; + @Param({ "1" }) + int partitionCount; + + VeniceClusterWrapper cluster; + DaVinciClient client; public static void main(String[] args) throws Exception { Options options = new OptionsBuilder().include(DaVinciClientBenchmark.class.getSimpleName()) .addProfiler(GCProfiler.class) + .shouldFailOnError(true) .build(); new Runner(options).run(); } @@ -65,17 +73,17 @@ public static void main(String[] args) throws Exception { @Setup public void setUp() throws Exception { Utils.thisIsLocalhost(); - cluster = getVeniceCluster(1, 1, 1); - - String storeName; - if (valueType.equals("DENSE_VECTOR")) { - storeName = buildDenseVectorStore(cluster); - } else if (valueType.equals("SPARSE_VECTOR")) { - storeName = buildSparseVectorStore(cluster); - } else { - throw new VeniceException("Value type " + valueType + " is not supported in benchmark."); - } - + cluster = getVeniceCluster( + new VeniceClusterCreateOptions.Builder().replicationFactor(1) + .numberOfPartitions(partitionCount) + .maxNumberOfPartitions(partitionCount) + .numberOfServers(1) + .numberOfRouters(1) + .numberOfControllers(1) + .build()); + + keyCount = (int) (dataSize / valueSize); + String storeName = buildDenseVectorStore(cluster); client = getGenericAvroDaVinciClient(storeName, cluster); client.subscribeAll().get(5, TimeUnit.MINUTES); @@ -95,61 +103,95 @@ public void cleanUp() { } @State(Scope.Thread) - public static class ThreadContext { - public int key; - public GenericRecord record; + public static class SingleGetThreadContext { + int key; + GenericRecord record; @Setup(Level.Invocation) - public void setUp() { - key = ThreadLocalRandom.current().nextInt(KEY_COUNT); + public void setUp(DaVinciClientBenchmark benchmark) { + key = ThreadLocalRandom.current().nextInt(benchmark.keyCount); } } @Benchmark @Threads(1) - public void singleGetHitT1(ThreadContext context, Blackhole blackhole) throws Exception { + public void singleGetHitT1(SingleGetThreadContext context, Blackhole blackhole) throws Exception { singleGetHit(context, blackhole); } + @Benchmark + @Threads(1) + public void singleGetMissT1(SingleGetThreadContext context, Blackhole blackhole) throws Exception { + singleGetMiss(context, blackhole); + } + @Benchmark @Threads(8) - public void singleGetHitT8(ThreadContext context, Blackhole blackhole) throws Exception { + public void singleGetHitT8(SingleGetThreadContext context, Blackhole blackhole) throws Exception { singleGetHit(context, blackhole); } - protected void singleGetHit(ThreadContext context, Blackhole blackhole) throws Exception { + protected void singleGetHit(SingleGetThreadContext context, Blackhole blackhole) throws Exception { context.record = client.get(context.key, context.record).get(); blackhole.consume(context.record); + Assert.assertNotNull(context.record, "Key=" + context.key); } - protected String buildDenseVectorStore(VeniceClusterWrapper cluster) throws Exception { - Schema schema = Schema.parse( - "{" + " \"namespace\" : \"example.avro\"," + " \"type\": \"record\"," + " \"name\": \"DenseVector\"," - + " \"fields\": [" + " { \"name\": \"value\", \"type\": {\"type\": \"array\", \"items\": \"float\"} }" - + " ]" + "}"); - GenericRecord record = new GenericData.Record(schema); - List values = new ArrayList<>(); - for (int i = 0; i < valueLength; i++) { - values.add((float) i); + protected void singleGetMiss(SingleGetThreadContext context, Blackhole blackhole) throws Exception { + context.record = client.get(~context.key, context.record).get(); + blackhole.consume(context.record); + Assert.assertNull(context.record, "Key=" + context.key); + } + + @State(Scope.Thread) + public static class BatchGetThreadContext { + @Param({ "100" }) + int batchGetSize; + Set keys; + Map result; + + @Setup(Level.Invocation) + public void setUp(DaVinciClientBenchmark benchmark) { + keys = ThreadLocalRandom.current() + .ints(0, benchmark.keyCount) + .distinct() + .limit(batchGetSize) + .boxed() + .collect(Collectors.toSet()); } - record.put(VALUE_FIELD, values); - return cluster.createStore(KEY_COUNT, record); } - protected String buildSparseVectorStore(VeniceClusterWrapper cluster) throws Exception { - Schema schema = Schema.parse( - "{" + " \"namespace\" : \"example.avro\"," + " \"type\": \"record\"," + " \"name\": \"SparseVector\"," - + " \"fields\": [" + " { \"name\": \"index\", \"type\": {\"type\": \"array\", \"items\": \"int\"} }," - + " { \"name\": \"value\", \"type\": {\"type\": \"array\", \"items\": \"float\"} }" + " ]" + "}"); + @Benchmark + @Threads(1) + public void batchGetHitT1(BatchGetThreadContext context, Blackhole blackhole) throws Exception { + batchGetHit(context, blackhole); + } + + @Benchmark + @Threads(8) + public void batchGetHitT8(BatchGetThreadContext context, Blackhole blackhole) throws Exception { + batchGetHit(context, blackhole); + } + + protected void batchGetHit(BatchGetThreadContext context, Blackhole blackhole) throws Exception { + context.result = client.batchGet(context.keys).get(); + blackhole.consume(context.result); + Assert.assertEquals(context.result.size(), context.keys.size()); + context.result.values().forEach(Assert::assertNotNull); + } + + protected String buildDenseVectorStore(VeniceClusterWrapper cluster) { + Schema schema = AvroCompatibilityHelper.parse( + "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"DenseVector\", \"fields\": [{\"name\": \"values\", \"type\": {\"type\": \"array\", \"items\": \"float\"}}]}"); GenericRecord record = new GenericData.Record(schema); - List indices = new ArrayList<>(); - List values = new ArrayList<>(); - for (int i = 0; i < valueLength; i++) { - indices.add(i); + int length = valueSize / Float.BYTES; + List values = new ArrayList<>(length); + for (int i = 0; i < length; ++i) { values.add((float) i); } - record.put("index", indices); - record.put(VALUE_FIELD, values); - return cluster.createStore(KEY_COUNT, record); + record.put("values", values); + String storeName = cluster.createStore(keyCount, record); + cluster.createMetaSystemStore(storeName); + return storeName; } } diff --git a/internal/venice-test-common/src/jmh/resources/log4j2.properties b/internal/venice-test-common/src/jmh/resources/log4j2.properties new file mode 100644 index 0000000000..5c25219b59 --- /dev/null +++ b/internal/venice-test-common/src/jmh/resources/log4j2.properties @@ -0,0 +1,30 @@ +name = benchmark +status = error + +appenders = console +appender.console.type = Console +appender.console.name = stdout +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} - [%X{component}] %p [%c{1}] [%t] %m%n + +rootLogger.level = warn +rootLogger.appenderRefs = stdout +rootLogger.appenderRef.stdout.ref = stdout + +logger.d2.name = com.linkedin.d2 +logger.d2.level = off + +logger.r2.name = com.linkedin.r2 +logger.r2.level = off + +logger.alpini.name = com.linkedin.alpini +logger.alpini.level = error + +logger.kafka.name = org.apache.kafka +logger.kafka.level = error + +logger.helix.name = org.apache.helix +logger.helix.level = error + +logger.zookeeper.name = org.apache.zookeeper +logger.zookeeper.level = error diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java index 12e7876555..6dc41c8ccf 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java @@ -27,7 +27,6 @@ import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.venice.ConfigKeys; -import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.compression.GzipCompressor; import com.linkedin.venice.compression.NoopCompressor; @@ -792,17 +791,6 @@ public static void shutdownExecutor(ExecutorService executor, long timeout, Time Assert.assertTrue(executor.awaitTermination(timeout, unit)); } - public static void createMetaSystemStore( - ControllerClient controllerClient, - String storeName, - Optional logger) { - String metaSystemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); - VersionCreationResponse response = - assertCommand(controllerClient.emptyPush(metaSystemStoreName, "testEmptyPush", 1234321)); - TestUtils.waitForNonDeterministicPushCompletion(response.getKafkaTopic(), controllerClient, 1, TimeUnit.MINUTES); - logger.ifPresent(value -> value.info("System store " + metaSystemStoreName + " is created.")); - } - public static void addIngestionIsolationToProperties(Properties properties) { properties.putAll(getIngestionIsolationPropertyMap()); }