diff --git a/src/main/java/common/PairRDDAggregator.java b/src/main/java/common/PairRDDAggregator.java index ce394f8..4027013 100644 --- a/src/main/java/common/PairRDDAggregator.java +++ b/src/main/java/common/PairRDDAggregator.java @@ -3,6 +3,8 @@ import java.util.ArrayList; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.storage.StorageLevel; + import scala.Tuple2; import scala.collection.JavaConverters; @@ -57,10 +59,12 @@ void processBatch() { final var newBatchRDD = sc.parallelizePairs(currentBatch, numPartitions); // To avoid running out of memory, 'checkpoint' the RDD. (The goal is to - // force it to be fully evaluated and then stored on disk, removing any need - // to recompute it, since receomputing requires that the original array of - // batch data must be present in memory somewhere). + // force it to be fully evaluated (and potentially evicted to disk), + // removing any need to recompute it, since receomputing requires that the + // original array of batch data must be present in memory somewhere). + newBatchRDD.cache(); newBatchRDD.checkpoint(); + // newBatchRDD.persist(StorageLevel.MEMORY_AND_DISK()); batches.add(newBatchRDD); currentBatch = null; diff --git a/src/main/java/prover/Prover.java b/src/main/java/prover/Prover.java index 8ac4bb0..9c6b68b 100644 --- a/src/main/java/prover/Prover.java +++ b/src/main/java/prover/Prover.java @@ -127,9 +127,9 @@ static JavaSparkContext createSparkContext(boolean local) { spark.sparkContext().conf().set("spark.files.overwrite", "true"); // checkpoint directory - spark.sparkContext().setCheckpointDir("checkpoint"); + spark.sparkContext().setCheckpointDir("hdfs://ip-172-31-42-216:9000/checkpoints/"); // clean checkpoint files if the reference is out of scope - spark.sparkContext().conf().set("spark.cleaner.referenceTracking.cleanCheckpoints", "true"); + // spark.sparkContext().conf().set("spark.cleaner.referenceTracking.cleanCheckpoints", "true"); // TODO: reinstate this when it can be made to work // spark.sparkContext().conf().set(