Skip to content

Commit

Permalink
[test] Fix and improve DaVinciClientBenchmark (linkedin#410)
Browse files Browse the repository at this point in the history
[test] Fix and improve DaVinciClient benchmark
  • Loading branch information
anaberezhnov authored May 10, 2023
1 parent 262759f commit 74075df
Show file tree
Hide file tree
Showing 17 changed files with 350 additions and 261 deletions.
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ plugins {
id 'com.github.spotbugs' version '4.8.0' apply false
id 'org.gradle.test-retry' version '1.5.0' apply false
id 'com.form.diff-coverage' version '0.9.5' apply false
id 'me.champeau.jmh' version '0.6.7' apply false
}

apply from: "$rootDir/gradle/helper/git.gradle"
Expand Down Expand Up @@ -78,8 +79,6 @@ ext.libraries = [
javax: 'javax.servlet:javax.servlet-api:3.1.0',
javaxActivation: 'com.sun.activation:javax.activation:1.2.0',
jdom: 'org.jdom:jdom:1.1',
jmhCore: 'org.openjdk.jmh:jmh-core:1.28',
jmhGenerator: 'org.openjdk.jmh:jmh-generator-annprocess:1.28',
jna: 'net.java.dev.jna:jna:4.5.1',
jsr305: 'com.google.code.findbugs:jsr305:3.0.2',
joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -95,38 +93,26 @@ public VeniceClusterInitializer(String storeName, int routerPort) {
this.storeName = storeName;
// Create test store
this.controllerClient = this.veniceCluster.getControllerClient();
NewStoreResponse newStoreResponse =
controllerClient.createNewStore(storeName, "test_owner", KEY_SCHEMA_STR, VALUE_SCHEMA_STR);
if (newStoreResponse.isError()) {
throw new VeniceException(
"Failed to create the store: " + storeName + ", and the error: " + newStoreResponse.getError());
}
TestUtils.createMetaSystemStore(controllerClient, storeName, Optional.of(LOGGER));
TestUtils.assertCommand(controllerClient.createNewStore(storeName, "test_owner", KEY_SCHEMA_STR, VALUE_SCHEMA_STR));
this.veniceCluster.createMetaSystemStore(storeName);
// Enable read compute
UpdateStoreQueryParams params = new UpdateStoreQueryParams();
params.setReadComputationEnabled(true);
ControllerResponse updateStoreResponse = controllerClient.updateStore(storeName, params);
if (updateStoreResponse.isError()) {
throw new VeniceException(
"Failed to update store: " + storeName + ", and the error: " + updateStoreResponse.getError());
}
VersionCreationResponse newVersion = controllerClient.requestTopicForWrites(
storeName,
10240000,
Version.PushType.BATCH,
Version.guidBasedDummyPushId(),
true,
false,
false,
Optional.empty(),
Optional.empty(),
Optional.empty(),
false,
-1);
if (newVersion.isError()) {
throw new VeniceException(
"Failed to create a new version for store: " + storeName + ", and error is: " + newVersion.getError());
}
TestUtils.assertCommand(controllerClient.updateStore(storeName, params));
VersionCreationResponse newVersion = TestUtils.assertCommand(
controllerClient.requestTopicForWrites(
storeName,
10240000,
Version.PushType.BATCH,
Version.guidBasedDummyPushId(),
true,
false,
false,
Optional.empty(),
Optional.empty(),
Optional.empty(),
false,
-1));
this.pushVersion = newVersion.getVersion();
this.pushVersionTopic = newVersion.getKafkaTopic();
this.valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;
Expand Down
48 changes: 28 additions & 20 deletions internal/venice-test-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ import java.nio.file.StandardCopyOption
import java.nio.file.StandardOpenOption


apply {
plugin 'me.champeau.jmh'
}

jmh {
fork = 3
warmupForks = 1
iterations = 10
warmupIterations = 5
timeUnit = 'ns'
resultFormat = 'json'
failOnError = true
includeTests = false
profilers = ['gc']
benchmarkMode = ['sample']
includes = ['DaVinciClientBenchmark']
jvmArgs = ['-Xms4G', '-Xmx4G', '-Djmh.shutdownTimeout=0', '-Djmh.shutdownTimeout.step=0']
}

jmhJar {
zip64 = true
}

configurations {
all {
resolutionStrategy {
Expand All @@ -16,16 +39,11 @@ configurations {
exclude group: 'org.apache.kafka'
exclude group: 'org.mortbay.jetty', module: 'servlet-api'
}
jmhImplementation.extendsFrom testImplementation
integrationTestImplementation.extendsFrom testImplementation
integrationTestUtils
}

sourceSets {
jmh {
java.srcDirs = ['src/jmh/java']
}

integrationTest {
java.srcDirs = ['src/integrationtest/java']
resources.srcDirs = ['src/integrationtest/resources']
Expand Down Expand Up @@ -58,7 +76,9 @@ dependencies {
implementation libraries.commonsCli
implementation libraries.conscrypt
implementation libraries.fastUtil
implementation libraries.hadoopCommon
implementation (libraries.hadoopCommon) {
exclude group: 'javax.servlet'
}
implementation libraries.helix
implementation libraries.httpAsyncClient
implementation libraries.javax
Expand All @@ -77,23 +97,12 @@ dependencies {
}
testImplementation project(':clients:venice-admin-tool')
testImplementation project(':internal:alpini:common:alpini-common-base')
testImplementation project(":internal:venice-common").sourceSets.test.output

jmhAnnotationProcessor libraries.jmhGenerator
jmhImplementation libraries.jmhCore
testImplementation project(':internal:venice-common').sourceSets.test.output

jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:' + jmh.jmhVersion.get()
jmhImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils')
}

task jmh(type: JavaExec, dependsOn: jmhClasses) {
main = 'org.openjdk.jmh.Main'

// In order to run just one test from the command line, specify it here, and run ./gradlew internal:venice-test-common:jmh
// main = 'com.linkedin.venice.benchmark.ZstdDecompressionBenchmark'

classpath = sourceSets.jmh.runtimeClasspath
}

def integrationTestConfigs = {
mustRunAfter test
classpath = sourceSets.integrationTest.runtimeClasspath
Expand Down Expand Up @@ -326,7 +335,6 @@ ext {
diffCoverageThreshold = 0.00
}


publishing {
publications {
"${project.name}" (MavenPublication) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testObjectReuse(DaVinciConfig clientConfig) throws Exception {
final GenericRecord value = new GenericData.Record(schema);
value.put("number", 10);
String storeName = cluster.createStore(KEY_COUNT, value);
cluster.useControllerClient(client -> TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)));
cluster.createMetaSystemStore(storeName);

String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath();
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
Expand Down Expand Up @@ -943,7 +943,7 @@ private void setupHybridStore(String storeName, Consumer<UpdateStoreQueryParams>
paramsConsumer.accept(params);
cluster.useControllerClient(client -> {
client.createNewStore(storeName, "owner", DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA);
TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER));
cluster.createMetaSystemStore(storeName);
client.updateStore(storeName, params);
cluster.createVersion(storeName, DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA, Stream.of());
SystemProducer producer = IntegrationTestPushUtils.getSamzaProducer(
Expand Down Expand Up @@ -995,11 +995,8 @@ private void setUpStore(
paramsConsumer.accept(params);
try (ControllerClient controllerClient =
createStoreForJob(cluster, DEFAULT_KEY_SCHEMA, "\"string\"", vpjProperties)) {
TestUtils.createMetaSystemStore(controllerClient, storeName, Optional.of(LOGGER));
ControllerResponse response = controllerClient.updateStore(storeName, params);
Assert.assertFalse(response.isError(), response.getError());

// Push data through VPJ.
cluster.createMetaSystemStore(storeName);
TestUtils.assertCommand(controllerClient.updateStore(storeName, params));
runVPJ(vpjProperties, 1, cluster);
}
}
Expand All @@ -1015,7 +1012,7 @@ private static void runVPJ(Properties vpjProperties, int expectedVersionNumber,

private String createStoreWithMetaSystemStore(int keyCount) throws Exception {
String storeName = cluster.createStore(keyCount);
cluster.useControllerClient(client -> TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER)));
cluster.createMetaSystemStore(storeName);
return storeName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,14 @@ public void verifyPostConditions(Method method) {
// @Test(timeOut = TEST_TIMEOUT * 2)
public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception {
final String storeName = Utils.getUniqueString("store");
cluster.useControllerClient(client -> {
TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
DEFAULT_KEY_SCHEMA,
VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD));
TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER));
});
cluster.useControllerClient(
client -> TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
DEFAULT_KEY_SCHEMA,
VALUE_SCHEMA_FOR_COMPUTE_NULLABLE_LIST_FIELD)));
cluster.createMetaSystemStore(storeName);

VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
Expand Down Expand Up @@ -278,11 +277,12 @@ public void testComputeOnStoreWithQTFDScompliantSchema() throws Exception {
public void testReadComputeMissingField() throws Exception {
// Create DaVinci store
final String storeName = Utils.getUniqueString("store");
cluster.useControllerClient(client -> {
TestUtils.assertCommand(
client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, VALUE_SCHEMA_FOR_COMPUTE));
TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER));
});
cluster.useControllerClient(
client -> TestUtils.assertCommand(
client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, VALUE_SCHEMA_FOR_COMPUTE))

);
cluster.createMetaSystemStore(storeName);

VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
Expand Down Expand Up @@ -411,11 +411,14 @@ public void testReadComputeMissingField() throws Exception {
public void testReadComputeSwappedFields() throws Exception {
// Create DaVinci store
final String storeName = Utils.getUniqueString("store");
cluster.useControllerClient(client -> {
TestUtils.assertCommand(
client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, VALUE_SCHEMA_FOR_COMPUTE_SWAPPED));
TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER));
});
cluster.useControllerClient(
client -> TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
DEFAULT_KEY_SCHEMA,
VALUE_SCHEMA_FOR_COMPUTE_SWAPPED)));
cluster.createMetaSystemStore(storeName);

VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
Expand Down Expand Up @@ -520,15 +523,14 @@ public void testReadComputeSwappedFields() throws Exception {
public void testComputeStreamingExecute() throws ExecutionException, InterruptedException {
// Setup Store
final String storeName = Utils.getUniqueString("store");
cluster.useControllerClient(client -> {
TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
KEY_SCHEMA_STEAMING_COMPUTE,
VALUE_SCHEMA_STREAMING_COMPUTE));
TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER));
});
cluster.useControllerClient(
client -> TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
KEY_SCHEMA_STEAMING_COMPUTE,
VALUE_SCHEMA_STREAMING_COMPUTE)));
cluster.createMetaSystemStore(storeName);

VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
Expand Down Expand Up @@ -619,15 +621,14 @@ public void onCompletion(Optional<Exception> exception) {
@Test(timeOut = TEST_TIMEOUT)
public void testPartialKeyLookupWithRocksDBBlockBasedTable() throws ExecutionException, InterruptedException {
final String storeName = Utils.getUniqueString("store");
cluster.useControllerClient(client -> {
TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
KEY_SCHEMA_PARTIAL_KEY_LOOKUP,
VALUE_SCHEMA_FOR_COMPUTE));
TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER));
});
cluster.useControllerClient(
client -> TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
KEY_SCHEMA_PARTIAL_KEY_LOOKUP,
VALUE_SCHEMA_FOR_COMPUTE)));
cluster.createMetaSystemStore(storeName);

VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
Expand Down Expand Up @@ -720,15 +721,14 @@ public void onCompletion(Optional<Exception> exception) {
@Test(timeOut = TEST_TIMEOUT)
public void testPartialKeyLookupWithRocksDBPlainTable() throws ExecutionException, InterruptedException {
final String storeName = Utils.getUniqueString("store");
cluster.useControllerClient(client -> {
TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
KEY_SCHEMA_PARTIAL_KEY_LOOKUP,
VALUE_SCHEMA_FOR_COMPUTE));
TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER));
});
cluster.useControllerClient(
client -> TestUtils.assertCommand(
client.createNewStore(
storeName,
getClass().getName(),
KEY_SCHEMA_PARTIAL_KEY_LOOKUP,
VALUE_SCHEMA_FOR_COMPUTE)));
cluster.createMetaSystemStore(storeName);

VersionCreationResponse newVersion = cluster.getNewVersion(storeName);
String topic = newVersion.getKafkaTopic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -93,13 +92,9 @@ public void verifyPostConditions(Method method) {
public void testLiveUpdateSuppression(IngestionMode ingestionMode) throws Exception {
final String storeName = Utils.getUniqueString("store");
cluster.useControllerClient(client -> {
NewStoreResponse response =
client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA);
if (response.isError()) {
throw new VeniceException(response.getError());
}
TestUtils.createMetaSystemStore(client, storeName, Optional.of(LOGGER));
// Update to hybrid store
TestUtils.assertCommand(
client.createNewStore(storeName, getClass().getName(), DEFAULT_KEY_SCHEMA, DEFAULT_VALUE_SCHEMA));
cluster.createMetaSystemStore(storeName);
client.updateStore(
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(10).setHybridOffsetLagThreshold(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void setUpStore() {
String owner = "test";
// set up push status store
TestUtils.assertCommand(controllerClient.createNewStore(storeName, owner, DEFAULT_KEY_SCHEMA, "\"string\""));
TestUtils.createMetaSystemStore(controllerClient, storeName, Optional.of(LOGGER));
cluster.createMetaSystemStore(storeName);
TestUtils.assertCommand(
controllerClient.updateStore(
storeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ public void testNativeReplicationWithIngestionIsolationInDaVinci() throws Except
Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
}

// Setup meta system store for Da Vinci usage.
TestUtils.createMetaSystemStore(parentControllerClient, storeName, Optional.of(LOGGER));

// Test Da-vinci client is able to consume from NR region which is consuming remotely
VeniceMultiClusterWrapper childDataCenter = childDatacenters.get(1);

Expand Down
Loading

0 comments on commit 74075df

Please sign in to comment.