Skip to content

Commit

Permalink
Cache application_jar in GCS using hash appended name
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Mar 20, 2023
1 parent 6d2d1b2 commit 1146a06
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.internal.app.deploy;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -58,6 +57,7 @@
import io.cdap.cdap.common.lang.jar.BundleJarUtil;
import io.cdap.cdap.common.lang.jar.ClassLoaderFolder;
import io.cdap.cdap.common.utils.DirUtils;
import io.cdap.cdap.common.utils.HashUtils;
import io.cdap.cdap.internal.app.deploy.pipeline.AppDeploymentInfo;
import io.cdap.cdap.internal.app.deploy.pipeline.AppDeploymentRuntimeInfo;
import io.cdap.cdap.internal.app.deploy.pipeline.AppSpecInfo;
Expand Down Expand Up @@ -633,33 +633,8 @@ private Map<String, String> getExtraProgramOptions(boolean isDistributed) {
private String getArtifactHash(Hasher hasher) {
String hashVal = hasher.hash().toString();
if (artifactsComputeHashTimeBucketDays > 0) {
return timeBucketHash(hashVal, artifactsComputeHashTimeBucketDays, System.currentTimeMillis());
return HashUtils.timeBucketHash(hashVal, artifactsComputeHashTimeBucketDays, System.currentTimeMillis());
}
return hashVal;
}

/**
* Return timed bucketed hash value. Therefore, for a given hash, result is identical as long as call to this
* method has happened within the given window.
* This method also uses a jitter based on provided hash to reduce likelihood of two time bucket with different hashes
* to be identical.
* For a given n as the jitter chosen from 0 to window-1, time bucket windows will be as follows:
* [window -n, 2*window -n), [2*window -n , 3*window -n), ...
* The beginning of an above time bucket window which currentTime lies in uniquely identifies the time bucket window
* for the given hash.
*
* @param hash value that needs to be time bucketed.
* @param window in days
* @param currentTime current time in millisecond
* @return
*/
@VisibleForTesting
public static String timeBucketHash(String hash, int window, long currentTime) {
// jitter is used to avoid having identical time bucket windows for different keys
long jitter = TimeUnit.DAYS.toMillis(hash.hashCode() % window);
long windowMSec = TimeUnit.DAYS.toMillis(window);

long nextBucket = (currentTime / windowMSec) * windowMSec + windowMSec - jitter;
return String.format("%s_%s", hash, nextBucket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.cdap.cdap.common.logging.LoggingContext;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.utils.DirUtils;
import io.cdap.cdap.common.utils.ProjectInfo;
import io.cdap.cdap.logging.context.LoggingContextHelper;
import io.cdap.cdap.proto.id.ProgramRunId;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -509,14 +510,17 @@ private void createApplicationJar(ApplicationBundler bundler,
}

// The location name is computed from the MD5 of all the classes names
// The localized name is always APPLICATION_JAR
// The localized name is always APPLICATION_JAR when the job is launched using SSH.
List<String> classList = classes.stream().map(Class::getName).sorted().collect(Collectors.toList());
Hasher hasher = Hashing.md5().newHasher();
for (String name : classList) {
hasher.putString(name);
}
// Only depends on class list so that it can be reused across different launches
String name = hasher.hash().toString() + "-" + Constants.Files.APPLICATION_JAR;
// Add cdap version to the hash so that application jars are distinguishable when upgrade happens.
hasher.putString(ProjectInfo.getVersion().toString());
// Only depends on class list and cdap version so that it can be reused across different launches
String hashVal = hasher.hash().toString();
String name = hashVal + "-" + Constants.Files.APPLICATION_JAR;

LOG.debug("Create and copy {}", Constants.Files.APPLICATION_JAR);
Location location = locationCache.get(name, new LocationCache.Loader() {
Expand All @@ -529,9 +533,11 @@ public void load(String name, Location targetLocation) throws IOException {
LOG.debug("Done {}", Constants.Files.APPLICATION_JAR);

localFiles.put(Constants.Files.APPLICATION_JAR,
createLocalFile(Constants.Files.APPLICATION_JAR, location, true));
createLocalFile(getApplicationJarLocalizedName(hashVal), location, true));
}

abstract String getApplicationJarLocalizedName(String hashVal);

private void createResourcesJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles,
Path stagingDir) throws IOException {
// If there is no resources, no need to create the jar file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ protected void addRuntimeConfigFiles(Path runtimeConfigDir) throws IOException {
saveResource(runtimeConfigDir, SETUP_SPARK_PY);
}

@Override
String getApplicationJarLocalizedName(String hashVal) {
return Constants.Files.APPLICATION_JAR;
}

@Override
protected void launch(TwillRuntimeSpecification twillRuntimeSpec, RuntimeSpecification runtimeSpec,
JvmOptions jvmOptions, Map<String, String> environments, Map<String, LocalFile> localFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.gson.Gson;
import io.cdap.cdap.app.runtime.ProgramOptions;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.utils.HashUtils;
import io.cdap.cdap.common.utils.ProjectInfo;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.runtime.distributed.runtimejob.DefaultRuntimeJobInfo;
import io.cdap.cdap.proto.id.ProgramRunId;
Expand All @@ -34,6 +36,7 @@
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.DefaultLocalFile;
import org.apache.twill.internal.JvmOptions;
import org.apache.twill.internal.TwillRuntimeSpecification;
Expand Down Expand Up @@ -63,6 +66,9 @@ class RuntimeJobTwillPreparer extends AbstractRuntimeTwillPreparer {
private final Map<String, Location> secretFiles;
private final Supplier<RuntimeJobManager> jobManagerSupplier;
private final ProgramOptions programOptions;
private final boolean artifactsComputeHash;
private final boolean artifactsComputeHashSnapshot;
private final int artifactsComputeHashTimeBucketDays;

RuntimeJobTwillPreparer(CConfiguration cConf, Configuration hConf,
TwillSpecification twillSpec, ProgramRunId programRunId,
Expand All @@ -73,6 +79,11 @@ class RuntimeJobTwillPreparer extends AbstractRuntimeTwillPreparer {
this.secretFiles = secretFiles;
this.jobManagerSupplier = jobManagerSupplier;
this.programOptions = programOptions;
this.artifactsComputeHash = cConf.getBoolean(io.cdap.cdap.common.conf.Constants.AppFabric.ARTIFACTS_COMPUTE_HASH);
this.artifactsComputeHashSnapshot = cConf.getBoolean(
io.cdap.cdap.common.conf.Constants.AppFabric.ARTIFACTS_COMPUTE_HASH_SNAPSHOT);
this.artifactsComputeHashTimeBucketDays = cConf.getInt(
io.cdap.cdap.common.conf.Constants.AppFabric.ARTIFACTS_COMPUTE_HASH_TIME_BUCKET_DAYS);
}

@Override
Expand Down Expand Up @@ -100,11 +111,31 @@ protected void launch(TwillRuntimeSpecification twillRuntimeSpec, RuntimeSpecifi
}
}

@Override
String getApplicationJarLocalizedName(String hashVal) {
ProjectInfo.Version cdapVersion = ProjectInfo.getVersion();
String computedHashVal = hashVal;
if (artifactsComputeHash && (artifactsComputeHashSnapshot || !cdapVersion.isSnapshot())) {
if (artifactsComputeHashTimeBucketDays > 0) {
computedHashVal =
HashUtils.timeBucketHash(
hashVal, artifactsComputeHashTimeBucketDays, System.currentTimeMillis());
}
}
return String.format("application_%s.jar", computedHashVal);
}

private RuntimeJobInfo createRuntimeJobInfo(RuntimeSpecification runtimeSpec,
Map<String, LocalFile> localFiles, String jvmOpts) {

Map<String, String> jvmProperties = parseJvmProperties(jvmOpts);
LOG.info("JVM properties {}", jvmProperties);
Map<String, String> runtimeJobArguments = new HashMap<>();
String applicationJarLocalizedName = localFiles.get(Constants.Files.APPLICATION_JAR).getName();
// The applicationJarLocalizedName is passed in runtimeJobArguments will be used by
// runtimeJobManager while launching the job so that it can be added in classpath in
// runner cluster.
runtimeJobArguments.put(Constants.Files.APPLICATION_JAR, applicationJarLocalizedName);

Collection<? extends LocalFile> files =
Stream.concat(localFiles.values().stream(), runtimeSpec.getLocalFiles().stream()).collect(Collectors.toList());
Expand All @@ -118,6 +149,8 @@ private RuntimeJobInfo createRuntimeJobInfo(RuntimeSpecification runtimeSpec,
} else {
cacheableFiles = new HashSet<>();
}
// Add application jar to cacheable files to reuse it across launches.
cacheableFiles.add(applicationJarLocalizedName);

for (LocalFile f : files) {
if (!cacheableFiles.contains(f.getName())) {
Expand All @@ -127,7 +160,7 @@ private RuntimeJobInfo createRuntimeJobInfo(RuntimeSpecification runtimeSpec,
}
}

return new DefaultRuntimeJobInfo(getProgramRunId(), resultingFiles, jvmProperties);
return new DefaultRuntimeJobInfo(getProgramRunId(), resultingFiles, jvmProperties, runtimeJobArguments);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@ public class DefaultRuntimeJobInfo implements RuntimeJobInfo {
private final ProgramRunInfo info;
private final Collection<? extends LocalFile> files;
private final Map<String, String> jvmProperties;
private final Map<String, String> arguments;


public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection<? extends LocalFile> files,
Map<String, String> jvmProperties) {
this(programRunId, files, jvmProperties, Collections.emptyMap());
}

public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection<? extends LocalFile> files,
Map<String, String> jvmProperties, Map<String, String> arguments) {
this.info = new ProgramRunInfo.Builder()
.setNamespace(programRunId.getNamespace())
.setApplication(programRunId.getApplication())
Expand All @@ -48,6 +54,7 @@ public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection<? extends Loc
.setRun(programRunId.getRun()).build();
this.files = Collections.unmodifiableCollection(new ArrayList<>(files));
this.jvmProperties = Collections.unmodifiableMap(new LinkedHashMap<>(jvmProperties));
this.arguments = Collections.unmodifiableMap(arguments);
}

@Override
Expand All @@ -69,4 +76,9 @@ public String getRuntimeJobClassname() {
public Map<String, String> getJvmProperties() {
return jvmProperties;
}

@Override
public Map<String, String> getArguments() {
return arguments;
}
}
45 changes: 45 additions & 0 deletions cdap-common/src/main/java/io/cdap/cdap/common/utils/HashUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.common.utils;

import java.util.concurrent.TimeUnit;

/** Utility class for Hash-related operations. */
public class HashUtils {

/**
* Returns timed bucketed hash value. Therefore, for a given hash, result is identical as long as
* call to this method has happened within the given window. This method also uses a jitter based
* on provided hash to reduce likelihood of two time bucket with different hashes to be identical.
* For a given n as the jitter chosen from 0 to window-1, time bucket windows will be as follows:
* [window -n, 2*window -n), [2*window -n , 3*window -n), ... The beginning of an above time
* bucket window which currentTime lies in uniquely identifies the time bucket window for the
* given hash.
*
* @param hash value that needs to be time bucketed.
* @param window in days
* @param currentTime current time in millisecond
*/
public static String timeBucketHash(String hash, int window, long currentTime) {
// jitter is used to avoid having identical time bucket windows for different keys
long jitter = TimeUnit.DAYS.toMillis(hash.hashCode() % window);
long windowMillis = TimeUnit.DAYS.toMillis(window);

long nextBucket = (currentTime / windowMillis) * windowMillis + windowMillis - jitter;
return String.format("%s_%s", hash, nextBucket);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022 Cask Data, Inc.
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand All @@ -14,7 +14,7 @@
* the License.
*/

package io.cdap.cdap.internal.app.deploy;
package io.cdap.cdap.common.utils;

import org.junit.Assert;
import org.junit.Test;
Expand All @@ -23,7 +23,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class InMemoryProgramRunDispatcherTest {
public class HashUtilsTest {

@Test
public void testTimeBucketHash() {
Expand All @@ -33,22 +33,19 @@ public void testTimeBucketHash() {
Set<String> results2 = new HashSet<>();
long dayInMilliSec = TimeUnit.DAYS.toMillis(1);
for (int i = 0; i < 15; i++) {
results1.add(InMemoryProgramRunDispatcher.timeBucketHash("hash1", window, currentTime + (i * dayInMilliSec)));
results2.add(InMemoryProgramRunDispatcher.timeBucketHash("hash2", window, currentTime + (i * dayInMilliSec)));
results1.add(HashUtils.timeBucketHash("hash1", window, currentTime + (i * dayInMilliSec)));
results2.add(HashUtils.timeBucketHash("hash2", window, currentTime + (i * dayInMilliSec)));
}

// for 15 days window, we should have maximum 2 time buckets
Assert.assertTrue(results1.size() >= 1 && results1.size() <= 2);
Assert.assertTrue(results2.size() >= 1 && results2.size() <= 2);

for (String res : results1) {
results2.remove(res);
}

for (String res : results2) {
results1.remove(res);
}

// ensures that jitter works so all keys do not end up in the same bucket
Assert.assertNotEquals(results1.size(), 0);
Assert.assertNotEquals(results2.size(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public static void main(String[] args) throws Exception {
if (!arguments.containsKey(SPARK_COMPAT)) {
throw new RuntimeException("Missing --" + SPARK_COMPAT + " argument for the spark compat version");
}
if (!arguments.containsKey(Constants.Files.APPLICATION_JAR)) {
throw new RuntimeException(
String.format("Missing --%s argument for the application jar name", Constants.Files.APPLICATION_JAR));
}

Thread.setDefaultUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception from thread {}", t, e));

Expand All @@ -83,6 +87,7 @@ public static void main(String[] args) throws Exception {

String runtimeJobClassName = arguments.get(RUNTIME_JOB_CLASS).iterator().next();
String sparkCompat = arguments.get(SPARK_COMPAT).iterator().next();
String applicationJarLocalizedName = arguments.get(Constants.Files.APPLICATION_JAR).iterator().next();

ClassLoader cl = DataprocJobMain.class.getClassLoader();
if (!(cl instanceof URLClassLoader)) {
Expand All @@ -91,7 +96,7 @@ public static void main(String[] args) throws Exception {

// create classpath from resources, application and twill jars
URL[] urls = getClasspath((URLClassLoader) cl, Arrays.asList(Constants.Files.RESOURCES_JAR,
Constants.Files.APPLICATION_JAR,
applicationJarLocalizedName,
Constants.Files.TWILL_JAR));
Arrays.stream(urls).forEach(url -> LOG.debug("Classpath URL: {}", url));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public class DataprocRuntimeJobManager implements RuntimeJobManager {
private volatile ClusterControllerClient clusterControllerClient;
// CDAP specific artifacts which will be cached in GCS.
private static final List<String> artifactsCacheablePerCDAPVersion = new ArrayList<>(
Arrays.asList(Constants.Files.TWILL_JAR, Constants.Files.LAUNCHER_JAR, Constants.Files.APPLICATION_JAR)
Arrays.asList(Constants.Files.TWILL_JAR, Constants.Files.LAUNCHER_JAR)
);
private static final int SNAPSHOT_EXPIRE_DAYS = 7;
private static final int EXPIRE_DAYS = 730;
Expand Down Expand Up @@ -631,6 +631,7 @@ private InputStream openStream(URI uri) throws IOException {
*/
private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo,
List<LocalFile> localFiles) throws IOException {
String applicationJarLocalizedName = runtimeJobInfo.getArguments().get(Constants.Files.APPLICATION_JAR);
ProgramRunInfo runInfo = runtimeJobInfo.getProgramRunInfo();
String runId = runInfo.getRun();

Expand All @@ -645,6 +646,7 @@ private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo,
for (Map.Entry<String, String> entry : runtimeJobInfo.getJvmProperties().entrySet()) {
arguments.add("--" + DataprocJobMain.PROPERTY_PREFIX + entry.getKey() + "=\"" + entry.getValue() + "\"");
}
arguments.add("--" + Constants.Files.APPLICATION_JAR + "=" + applicationJarLocalizedName);

Map<String, String> properties = new LinkedHashMap<>();
properties.put(CDAP_RUNTIME_NAMESPACE, runInfo.getNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,11 @@ public interface RuntimeJobInfo {
default Map<String, String> getJvmProperties() {
return Collections.emptyMap();
}

/**
* Returns a set of arguments for process that runs the {@link RuntimeJob}.
*/
default Map<String, String> getArguments() {
return Collections.emptyMap();
}
}

0 comments on commit 1146a06

Please sign in to comment.