diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/deploy/InMemoryProgramRunDispatcher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/deploy/InMemoryProgramRunDispatcher.java index 86366e4f9ee3..f34514ec3935 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/deploy/InMemoryProgramRunDispatcher.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/deploy/InMemoryProgramRunDispatcher.java @@ -16,7 +16,6 @@ package io.cdap.cdap.internal.app.deploy; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.base.Throwables; @@ -58,6 +57,7 @@ import io.cdap.cdap.common.lang.jar.BundleJarUtil; import io.cdap.cdap.common.lang.jar.ClassLoaderFolder; import io.cdap.cdap.common.utils.DirUtils; +import io.cdap.cdap.common.utils.HashUtils; import io.cdap.cdap.internal.app.deploy.pipeline.AppDeploymentInfo; import io.cdap.cdap.internal.app.deploy.pipeline.AppDeploymentRuntimeInfo; import io.cdap.cdap.internal.app.deploy.pipeline.AppSpecInfo; @@ -633,33 +633,8 @@ private Map getExtraProgramOptions(boolean isDistributed) { private String getArtifactHash(Hasher hasher) { String hashVal = hasher.hash().toString(); if (artifactsComputeHashTimeBucketDays > 0) { - return timeBucketHash(hashVal, artifactsComputeHashTimeBucketDays, System.currentTimeMillis()); + return HashUtils.timeBucketHash(hashVal, artifactsComputeHashTimeBucketDays, System.currentTimeMillis()); } return hashVal; } - - /** - * Return timed bucketed hash value. Therefore, for a given hash, result is identical as long as call to this - * method has happened within the given window. - * This method also uses a jitter based on provided hash to reduce likelihood of two time bucket with different hashes - * to be identical. - * For a given n as the jitter chosen from 0 to window-1, time bucket windows will be as follows: - * [window -n, 2*window -n), [2*window -n , 3*window -n), ... - * The beginning of an above time bucket window which currentTime lies in uniquely identifies the time bucket window - * for the given hash. - * - * @param hash value that needs to be time bucketed. - * @param window in days - * @param currentTime current time in millisecond - * @return - */ - @VisibleForTesting - public static String timeBucketHash(String hash, int window, long currentTime) { - // jitter is used to avoid having identical time bucket windows for different keys - long jitter = TimeUnit.DAYS.toMillis(hash.hashCode() % window); - long windowMSec = TimeUnit.DAYS.toMillis(window); - - long nextBucket = (currentTime / windowMSec) * windowMSec + windowMSec - jitter; - return String.format("%s_%s", hash, nextBucket); - } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/AbstractRuntimeTwillPreparer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/AbstractRuntimeTwillPreparer.java index a00f59fa8ac3..0f039eee288f 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/AbstractRuntimeTwillPreparer.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/AbstractRuntimeTwillPreparer.java @@ -32,6 +32,7 @@ import io.cdap.cdap.common.logging.LoggingContext; import io.cdap.cdap.common.logging.LoggingContextAccessor; import io.cdap.cdap.common.utils.DirUtils; +import io.cdap.cdap.common.utils.ProjectInfo; import io.cdap.cdap.logging.context.LoggingContextHelper; import io.cdap.cdap.proto.id.ProgramRunId; import org.apache.hadoop.conf.Configuration; @@ -509,14 +510,17 @@ private void createApplicationJar(ApplicationBundler bundler, } // The location name is computed from the MD5 of all the classes names - // The localized name is always APPLICATION_JAR + // The localized name is always APPLICATION_JAR when the job is launched using SSH. List classList = classes.stream().map(Class::getName).sorted().collect(Collectors.toList()); Hasher hasher = Hashing.md5().newHasher(); for (String name : classList) { hasher.putString(name); } - // Only depends on class list so that it can be reused across different launches - String name = hasher.hash().toString() + "-" + Constants.Files.APPLICATION_JAR; + // Add cdap version to the hash so that application jars are distinguishable when upgrade happens. + hasher.putString(ProjectInfo.getVersion().toString()); + // Only depends on class list and cdap version so that it can be reused across different launches + String hashVal = hasher.hash().toString(); + String name = hashVal + "-" + Constants.Files.APPLICATION_JAR; LOG.debug("Create and copy {}", Constants.Files.APPLICATION_JAR); Location location = locationCache.get(name, new LocationCache.Loader() { @@ -529,9 +533,11 @@ public void load(String name, Location targetLocation) throws IOException { LOG.debug("Done {}", Constants.Files.APPLICATION_JAR); localFiles.put(Constants.Files.APPLICATION_JAR, - createLocalFile(Constants.Files.APPLICATION_JAR, location, true)); + createLocalFile(getApplicationJarLocalizedName(hashVal), location, true)); } + abstract String getApplicationJarLocalizedName(String hashVal); + private void createResourcesJar(ApplicationBundler bundler, Map localFiles, Path stagingDir) throws IOException { // If there is no resources, no need to create the jar file. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillPreparer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillPreparer.java index cecf8d41beb1..d22f86e1864b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillPreparer.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RemoteExecutionTwillPreparer.java @@ -110,6 +110,11 @@ protected void addRuntimeConfigFiles(Path runtimeConfigDir) throws IOException { saveResource(runtimeConfigDir, SETUP_SPARK_PY); } + @Override + String getApplicationJarLocalizedName(String hashVal) { + return Constants.Files.APPLICATION_JAR; + } + @Override protected void launch(TwillRuntimeSpecification twillRuntimeSpec, RuntimeSpecification runtimeSpec, JvmOptions jvmOptions, Map environments, Map localFiles, diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RuntimeJobTwillPreparer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RuntimeJobTwillPreparer.java index 034e1c0c3d66..062323d4f807 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RuntimeJobTwillPreparer.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/remote/RuntimeJobTwillPreparer.java @@ -20,6 +20,8 @@ import com.google.gson.Gson; import io.cdap.cdap.app.runtime.ProgramOptions; import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.utils.HashUtils; +import io.cdap.cdap.common.utils.ProjectInfo; import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants; import io.cdap.cdap.internal.app.runtime.distributed.runtimejob.DefaultRuntimeJobInfo; import io.cdap.cdap.proto.id.ProgramRunId; @@ -34,6 +36,7 @@ import org.apache.twill.api.TwillSpecification; import org.apache.twill.filesystem.Location; import org.apache.twill.filesystem.LocationFactory; +import org.apache.twill.internal.Constants; import org.apache.twill.internal.DefaultLocalFile; import org.apache.twill.internal.JvmOptions; import org.apache.twill.internal.TwillRuntimeSpecification; @@ -63,6 +66,9 @@ class RuntimeJobTwillPreparer extends AbstractRuntimeTwillPreparer { private final Map secretFiles; private final Supplier jobManagerSupplier; private final ProgramOptions programOptions; + private final boolean artifactsComputeHash; + private final boolean artifactsComputeHashSnapshot; + private final int artifactsComputeHashTimeBucketDays; RuntimeJobTwillPreparer(CConfiguration cConf, Configuration hConf, TwillSpecification twillSpec, ProgramRunId programRunId, @@ -73,6 +79,11 @@ class RuntimeJobTwillPreparer extends AbstractRuntimeTwillPreparer { this.secretFiles = secretFiles; this.jobManagerSupplier = jobManagerSupplier; this.programOptions = programOptions; + this.artifactsComputeHash = cConf.getBoolean(io.cdap.cdap.common.conf.Constants.AppFabric.ARTIFACTS_COMPUTE_HASH); + this.artifactsComputeHashSnapshot = cConf.getBoolean( + io.cdap.cdap.common.conf.Constants.AppFabric.ARTIFACTS_COMPUTE_HASH_SNAPSHOT); + this.artifactsComputeHashTimeBucketDays = cConf.getInt( + io.cdap.cdap.common.conf.Constants.AppFabric.ARTIFACTS_COMPUTE_HASH_TIME_BUCKET_DAYS); } @Override @@ -100,11 +111,31 @@ protected void launch(TwillRuntimeSpecification twillRuntimeSpec, RuntimeSpecifi } } + @Override + String getApplicationJarLocalizedName(String hashVal) { + ProjectInfo.Version cdapVersion = ProjectInfo.getVersion(); + String computedHashVal = hashVal; + if (artifactsComputeHash && (artifactsComputeHashSnapshot || !cdapVersion.isSnapshot())) { + if (artifactsComputeHashTimeBucketDays > 0) { + computedHashVal = + HashUtils.timeBucketHash( + hashVal, artifactsComputeHashTimeBucketDays, System.currentTimeMillis()); + } + } + return String.format("application_%s.jar", computedHashVal); + } + private RuntimeJobInfo createRuntimeJobInfo(RuntimeSpecification runtimeSpec, Map localFiles, String jvmOpts) { Map jvmProperties = parseJvmProperties(jvmOpts); LOG.info("JVM properties {}", jvmProperties); + Map runtimeJobArguments = new HashMap<>(); + String applicationJarLocalizedName = localFiles.get(Constants.Files.APPLICATION_JAR).getName(); + // The applicationJarLocalizedName is passed in runtimeJobArguments will be used by + // runtimeJobManager while launching the job so that it can be added in classpath in + // runner cluster. + runtimeJobArguments.put(Constants.Files.APPLICATION_JAR, applicationJarLocalizedName); Collection files = Stream.concat(localFiles.values().stream(), runtimeSpec.getLocalFiles().stream()).collect(Collectors.toList()); @@ -118,6 +149,8 @@ private RuntimeJobInfo createRuntimeJobInfo(RuntimeSpecification runtimeSpec, } else { cacheableFiles = new HashSet<>(); } + // Add application jar to cacheable files to reuse it across launches. + cacheableFiles.add(applicationJarLocalizedName); for (LocalFile f : files) { if (!cacheableFiles.contains(f.getName())) { @@ -127,7 +160,7 @@ private RuntimeJobInfo createRuntimeJobInfo(RuntimeSpecification runtimeSpec, } } - return new DefaultRuntimeJobInfo(getProgramRunId(), resultingFiles, jvmProperties); + return new DefaultRuntimeJobInfo(getProgramRunId(), resultingFiles, jvmProperties, runtimeJobArguments); } /** diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJobInfo.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJobInfo.java index 3ecc13847944..efe6bb3bdd3a 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJobInfo.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/distributed/runtimejob/DefaultRuntimeJobInfo.java @@ -35,10 +35,16 @@ public class DefaultRuntimeJobInfo implements RuntimeJobInfo { private final ProgramRunInfo info; private final Collection files; private final Map jvmProperties; + private final Map arguments; public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection files, Map jvmProperties) { + this(programRunId, files, jvmProperties, Collections.emptyMap()); + } + + public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection files, + Map jvmProperties, Map arguments) { this.info = new ProgramRunInfo.Builder() .setNamespace(programRunId.getNamespace()) .setApplication(programRunId.getApplication()) @@ -48,6 +54,7 @@ public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection(files)); this.jvmProperties = Collections.unmodifiableMap(new LinkedHashMap<>(jvmProperties)); + this.arguments = Collections.unmodifiableMap(arguments); } @Override @@ -69,4 +76,9 @@ public String getRuntimeJobClassname() { public Map getJvmProperties() { return jvmProperties; } + + @Override + public Map getArguments() { + return arguments; + } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/utils/HashUtils.java b/cdap-common/src/main/java/io/cdap/cdap/common/utils/HashUtils.java new file mode 100644 index 000000000000..d3ceecec608d --- /dev/null +++ b/cdap-common/src/main/java/io/cdap/cdap/common/utils/HashUtils.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.common.utils; + +import java.util.concurrent.TimeUnit; + +/** Utility class for Hash-related operations. */ +public class HashUtils { + + /** + * Returns timed bucketed hash value. Therefore, for a given hash, result is identical as long as + * call to this method has happened within the given window. This method also uses a jitter based + * on provided hash to reduce likelihood of two time bucket with different hashes to be identical. + * For a given n as the jitter chosen from 0 to window-1, time bucket windows will be as follows: + * [window -n, 2*window -n), [2*window -n , 3*window -n), ... The beginning of an above time + * bucket window which currentTime lies in uniquely identifies the time bucket window for the + * given hash. + * + * @param hash value that needs to be time bucketed. + * @param window in days + * @param currentTime current time in millisecond + */ + public static String timeBucketHash(String hash, int window, long currentTime) { + // jitter is used to avoid having identical time bucket windows for different keys + long jitter = TimeUnit.DAYS.toMillis(hash.hashCode() % window); + long windowMillis = TimeUnit.DAYS.toMillis(window); + + long nextBucket = (currentTime / windowMillis) * windowMillis + windowMillis - jitter; + return String.format("%s_%s", hash, nextBucket); + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/deploy/InMemoryProgramRunDispatcherTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/utils/HashUtilsTest.java similarity index 80% rename from cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/deploy/InMemoryProgramRunDispatcherTest.java rename to cdap-common/src/test/java/io/cdap/cdap/common/utils/HashUtilsTest.java index 3028378e06bf..ab5faf3f3d5c 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/deploy/InMemoryProgramRunDispatcherTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/utils/HashUtilsTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022 Cask Data, Inc. + * Copyright © 2023 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -14,7 +14,7 @@ * the License. */ -package io.cdap.cdap.internal.app.deploy; +package io.cdap.cdap.common.utils; import org.junit.Assert; import org.junit.Test; @@ -23,7 +23,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -public class InMemoryProgramRunDispatcherTest { +public class HashUtilsTest { @Test public void testTimeBucketHash() { @@ -33,22 +33,19 @@ public void testTimeBucketHash() { Set results2 = new HashSet<>(); long dayInMilliSec = TimeUnit.DAYS.toMillis(1); for (int i = 0; i < 15; i++) { - results1.add(InMemoryProgramRunDispatcher.timeBucketHash("hash1", window, currentTime + (i * dayInMilliSec))); - results2.add(InMemoryProgramRunDispatcher.timeBucketHash("hash2", window, currentTime + (i * dayInMilliSec))); + results1.add(HashUtils.timeBucketHash("hash1", window, currentTime + (i * dayInMilliSec))); + results2.add(HashUtils.timeBucketHash("hash2", window, currentTime + (i * dayInMilliSec))); } // for 15 days window, we should have maximum 2 time buckets Assert.assertTrue(results1.size() >= 1 && results1.size() <= 2); Assert.assertTrue(results2.size() >= 1 && results2.size() <= 2); - for (String res : results1) { results2.remove(res); } - for (String res : results2) { results1.remove(res); } - // ensures that jitter works so all keys do not end up in the same bucket Assert.assertNotEquals(results1.size(), 0); Assert.assertNotEquals(results2.size(), 0); diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java index f293a781a1ff..f63e55395420 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocJobMain.java @@ -68,6 +68,10 @@ public static void main(String[] args) throws Exception { if (!arguments.containsKey(SPARK_COMPAT)) { throw new RuntimeException("Missing --" + SPARK_COMPAT + " argument for the spark compat version"); } + if (!arguments.containsKey(Constants.Files.APPLICATION_JAR)) { + throw new RuntimeException( + String.format("Missing --%s argument for the application jar name", Constants.Files.APPLICATION_JAR)); + } Thread.setDefaultUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception from thread {}", t, e)); @@ -83,6 +87,7 @@ public static void main(String[] args) throws Exception { String runtimeJobClassName = arguments.get(RUNTIME_JOB_CLASS).iterator().next(); String sparkCompat = arguments.get(SPARK_COMPAT).iterator().next(); + String applicationJarLocalizedName = arguments.get(Constants.Files.APPLICATION_JAR).iterator().next(); ClassLoader cl = DataprocJobMain.class.getClassLoader(); if (!(cl instanceof URLClassLoader)) { @@ -91,7 +96,7 @@ public static void main(String[] args) throws Exception { // create classpath from resources, application and twill jars URL[] urls = getClasspath((URLClassLoader) cl, Arrays.asList(Constants.Files.RESOURCES_JAR, - Constants.Files.APPLICATION_JAR, + applicationJarLocalizedName, Constants.Files.TWILL_JAR)); Arrays.stream(urls).forEach(url -> LOG.debug("Classpath URL: {}", url)); diff --git a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java index 2400236a692e..dc444d79033e 100644 --- a/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java +++ b/cdap-runtime-ext-dataproc/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/DataprocRuntimeJobManager.java @@ -130,7 +130,7 @@ public class DataprocRuntimeJobManager implements RuntimeJobManager { private volatile ClusterControllerClient clusterControllerClient; // CDAP specific artifacts which will be cached in GCS. private static final List artifactsCacheablePerCDAPVersion = new ArrayList<>( - Arrays.asList(Constants.Files.TWILL_JAR, Constants.Files.LAUNCHER_JAR, Constants.Files.APPLICATION_JAR) + Arrays.asList(Constants.Files.TWILL_JAR, Constants.Files.LAUNCHER_JAR) ); private static final int SNAPSHOT_EXPIRE_DAYS = 7; private static final int EXPIRE_DAYS = 730; @@ -631,6 +631,7 @@ private InputStream openStream(URI uri) throws IOException { */ private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo, List localFiles) throws IOException { + String applicationJarLocalizedName = runtimeJobInfo.getArguments().get(Constants.Files.APPLICATION_JAR); ProgramRunInfo runInfo = runtimeJobInfo.getProgramRunInfo(); String runId = runInfo.getRun(); @@ -645,6 +646,7 @@ private SubmitJobRequest getSubmitJobRequest(RuntimeJobInfo runtimeJobInfo, for (Map.Entry entry : runtimeJobInfo.getJvmProperties().entrySet()) { arguments.add("--" + DataprocJobMain.PROPERTY_PREFIX + entry.getKey() + "=\"" + entry.getValue() + "\""); } + arguments.add("--" + Constants.Files.APPLICATION_JAR + "=" + applicationJarLocalizedName); Map properties = new LinkedHashMap<>(); properties.put(CDAP_RUNTIME_NAMESPACE, runInfo.getNamespace()); diff --git a/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/RuntimeJobInfo.java b/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/RuntimeJobInfo.java index ceb102ad84ea..6605e7c0bc04 100644 --- a/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/RuntimeJobInfo.java +++ b/cdap-runtime-spi/src/main/java/io/cdap/cdap/runtime/spi/runtimejob/RuntimeJobInfo.java @@ -50,4 +50,11 @@ public interface RuntimeJobInfo { default Map getJvmProperties() { return Collections.emptyMap(); } + + /** + * Returns a set of arguments for process that runs the {@link RuntimeJob}. + */ + default Map getArguments() { + return Collections.emptyMap(); + } }