From 2d763b2e56f0e9505006531414474998159ee1d1 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Thu, 15 Apr 2021 20:15:48 +0800 Subject: [PATCH 01/35] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index fc172bae..11d8279e 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,7 @@ You will find jar files under oap-common/target and oap-spark/target. To enable rdd cache on Intel Optane PMem, you need add the following configurations to `spark-defaults.conf` ``` +spark.memory.pmem.extension.enabled true spark.memory.pmem.initial.path [Your Optane PMem paths seperate with comma] spark.memory.pmem.initial.size [Your Optane PMem size in GB] spark.memory.pmem.usable.ratio [from 0 to 1, 0.85 is recommended] From e06463b4a2e810a31bf384a32ffe7d190fe531c3 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:14:33 +0800 Subject: [PATCH 02/35] Update pom.xml --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index cc236bec..f66d5cc4 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ 4.12 2.12.10 2.12 - 3.0.0 + 3.1.1 1.2 2.6 9.4.39.v20210325 From 358c891ea0039354eaec8f5113aad085f7660172 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:15:37 +0800 Subject: [PATCH 03/35] Update MemoryConsumer.java From a46dddca28b8854803db7487cdda0172856c3c3b Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:15:59 +0800 Subject: [PATCH 04/35] Update PMemManagerInitializer.java From 832cd473d737e48a8eeaa2d3b67d6437f1794300 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:16:16 +0800 Subject: [PATCH 05/35] Update TaskMemoryManager.java From 1b59a13f41d18c724404e9b401e674ff6e24fdc0 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:18:41 +0800 Subject: [PATCH 06/35] Update BytesToBytesMap.java --- .../spark/unsafe/map/BytesToBytesMap.java | 114 +++++++++++++++--- 1 file changed, 95 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index e7463908..d691baab 100644 --- a/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -393,10 +393,12 @@ public void remove() { } private void handleFailedDelete() { - // remove the spill file from disk - File file = spillWriters.removeFirst().getFile(); - if (file != null && file.exists() && !file.delete()) { - logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); + if (spillWriters.size() > 0) { + // remove the spill file from disk + File file = spillWriters.removeFirst().getFile(); + if (file != null && file.exists() && !file.delete()) { + logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); + } } } } @@ -406,17 +408,10 @@ private void handleFailedDelete() { * * For efficiency, all calls to `next()` will return the same {@link Location} object. * - * If any other lookups or operations are performed on this map while iterating over it, including - * `lookup()`, the behavior of the returned iterator is undefined. + * The returned iterator is thread-safe. However if the map is modified while iterating over it, + * the behavior of the returned iterator is undefined. */ public MapIterator iterator() { - return new MapIterator(numValues, loc, false); - } - - /** - * Returns a thread safe iterator that iterates of the entries of this map. - */ - public MapIterator safeIterator() { return new MapIterator(numValues, new Location(), false); } @@ -427,19 +422,82 @@ public MapIterator safeIterator() { * * For efficiency, all calls to `next()` will return the same {@link Location} object. * - * If any other lookups or operations are performed on this map while iterating over it, including - * `lookup()`, the behavior of the returned iterator is undefined. + * The returned iterator is thread-safe. However if the map is modified while iterating over it, + * the behavior of the returned iterator is undefined. */ public MapIterator destructiveIterator() { updatePeakMemoryUsed(); - return new MapIterator(numValues, loc, true); + return new MapIterator(numValues, new Location(), true); + } + + /** + * Iterator for the entries of this map. This is to first iterate over key indices in + * `longArray` then accessing values in `dataPages`. NOTE: this is different from `MapIterator` + * in the sense that key index is preserved here + * (See `UnsafeHashedRelation` for example of usage). + */ + public final class MapIteratorWithKeyIndex implements Iterator { + + /** + * The index in `longArray` where the key is stored. + */ + private int keyIndex = 0; + + private int numRecords; + private final Location loc; + + private MapIteratorWithKeyIndex() { + this.numRecords = numValues; + this.loc = new Location(); + } + + @Override + public boolean hasNext() { + return numRecords > 0; + } + + @Override + public Location next() { + if (!loc.isDefined() || !loc.nextValue()) { + while (longArray.get(keyIndex * 2) == 0) { + keyIndex++; + } + loc.with(keyIndex, 0, true); + keyIndex++; + } + numRecords--; + return loc; + } + } + + /** + * Returns an iterator for iterating over the entries of this map, + * by first iterating over the key index inside hash map's `longArray`. + * + * For efficiency, all calls to `next()` will return the same {@link Location} object. + * + * The returned iterator is NOT thread-safe. If the map is modified while iterating over it, + * the behavior of the returned iterator is undefined. + */ + public MapIteratorWithKeyIndex iteratorWithKeyIndex() { + return new MapIteratorWithKeyIndex(); + } + + /** + * The maximum number of allowed keys index. + * + * The value of allowed keys index is in the range of [0, maxNumKeysIndex - 1]. + */ + public int maxNumKeysIndex() { + return (int) (longArray.size() / 2); } /** * Looks up a key, and return a {@link Location} handle that can be used to test existence * and read/write values. * - * This function always return the same {@link Location} instance to avoid object allocation. + * This function always returns the same {@link Location} instance to avoid object allocation. + * This function is not thread-safe. */ public Location lookup(Object keyBase, long keyOffset, int keyLength) { safeLookup(keyBase, keyOffset, keyLength, loc, @@ -451,7 +509,8 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength) { * Looks up a key, and return a {@link Location} handle that can be used to test existence * and read/write values. * - * This function always return the same {@link Location} instance to avoid object allocation. + * This function always returns the same {@link Location} instance to avoid object allocation. + * This function is not thread-safe. */ public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash) { safeLookup(keyBase, keyOffset, keyLength, loc, hash); @@ -606,6 +665,14 @@ public boolean isDefined() { return isDefined; } + /** + * Returns index for key. + */ + public int getKeyIndex() { + assert (isDefined); + return pos; + } + /** * Returns the base object for key. */ @@ -743,14 +810,23 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; + // If the map has reached its growth threshold, try to grow it. + if (numKeys >= growthThreshold) { // We use two array entries per key, so the array size is twice the capacity. // We should compare the current capacity of the array, instead of its size. - if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) { + if (longArray.size() / 2 < MAX_CAPACITY) { try { growAndRehash(); } catch (SparkOutOfMemoryError oom) { canGrowArray = false; } + } else { + // The map is already at MAX_CAPACITY and cannot grow. Instead, we prevent it from + // accepting any more new elements to make sure we don't exceed the load factor. If we + // need to spill later, this allows UnsafeKVExternalSorter to reuse the array for + // sorting. + canGrowArray = false; + } } } return true; From 1320ad16694812ecd3710c6178a8ae13069305c3 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:20:12 +0800 Subject: [PATCH 07/35] Update MemoryAllocator.java From 3ae52281e7aa198e45197e65b907d144ff5aead5 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:20:30 +0800 Subject: [PATCH 08/35] Update MemoryBlock.java From 6350d1b421f9b54bd75cb4412010f50081d15b16 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:20:54 +0800 Subject: [PATCH 09/35] Update PMemReader.java --- .../apache/spark/util/collection/unsafe/sort/PMemReader.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/apache/spark/util/collection/unsafe/sort/PMemReader.java b/src/main/java/org/apache/spark/util/collection/unsafe/sort/PMemReader.java index 1d2b1ee3..a838cc7d 100644 --- a/src/main/java/org/apache/spark/util/collection/unsafe/sort/PMemReader.java +++ b/src/main/java/org/apache/spark/util/collection/unsafe/sort/PMemReader.java @@ -79,6 +79,11 @@ public int getRecordLength() { public long getKeyPrefix() { return keyPrefix; } + + @Override + public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); + } @Override public void close() { From 6a0703f4ee1143f2b27345d6ddb300dd6e490a84 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:21:10 +0800 Subject: [PATCH 10/35] Update PMemReaderForUnsafeExternalSorter.java --- .../unsafe/sort/PMemReaderForUnsafeExternalSorter.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/apache/spark/util/collection/unsafe/sort/PMemReaderForUnsafeExternalSorter.java b/src/main/java/org/apache/spark/util/collection/unsafe/sort/PMemReaderForUnsafeExternalSorter.java index 4a39648b..018d2785 100644 --- a/src/main/java/org/apache/spark/util/collection/unsafe/sort/PMemReaderForUnsafeExternalSorter.java +++ b/src/main/java/org/apache/spark/util/collection/unsafe/sort/PMemReaderForUnsafeExternalSorter.java @@ -64,6 +64,11 @@ public void loadNext() { public int getNumRecords() { return numRecords; } + + @Override + public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); + } /** * load more PMem records in the buffer From 8d852bf0b8094e6b0b56576594da2a72234c4b43 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:21:29 +0800 Subject: [PATCH 11/35] Update PMemSpillWriterFactory.java From b29b62f99cf94a024220632617b47a61684a4616 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:21:51 +0800 Subject: [PATCH 12/35] Update PMemSpillWriterType.java From 0aa0fa86d051e97c3f99607e2cb8ae1a18c3359b Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:22:08 +0800 Subject: [PATCH 13/35] Update PMemWriter.java From 9e0c90f62992f3b76be859414ba3bbcc271a878b Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:22:24 +0800 Subject: [PATCH 14/35] Update SortedIteratorForSpills.java From 1fa170624c1be847d9c6a97113a5b0efe80ae0a2 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:23:04 +0800 Subject: [PATCH 15/35] Update SortedPMemPageSpillWriter.java --- .../collection/unsafe/sort/SortedPMemPageSpillWriter.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/org/apache/spark/util/collection/unsafe/sort/SortedPMemPageSpillWriter.java b/src/main/java/org/apache/spark/util/collection/unsafe/sort/SortedPMemPageSpillWriter.java index bd8ca9c8..3b1324f9 100644 --- a/src/main/java/org/apache/spark/util/collection/unsafe/sort/SortedPMemPageSpillWriter.java +++ b/src/main/java/org/apache/spark/util/collection/unsafe/sort/SortedPMemPageSpillWriter.java @@ -212,6 +212,12 @@ public SortedPMemPageSpillReader() throws IOException{ numRecordsOnDisk = diskSpillWriter.recordsSpilled(); } } + + @Override + public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); + } + @Override public boolean hasNext() { return curNumOfRec < numRecordsOnPMem + numRecordsOnDisk; From e64a414ac1ac1e1b1a61c47829f1a8229435bdad Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:23:26 +0800 Subject: [PATCH 16/35] Update SpillWriterForUnsafeSorter.java From 715bab38601c455c2f4abb0859828b1e7ea768d0 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:23:42 +0800 Subject: [PATCH 17/35] Update UnsafeExternalSorter.java --- .../unsafe/sort/UnsafeExternalSorter.java | 195 ++++++++++++------ 1 file changed, 132 insertions(+), 63 deletions(-) diff --git a/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 9f27c80b..7ca3f334 100644 --- a/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -113,11 +113,14 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter( int initialSize, long pageSizeBytes, int numElementsForSpillThreshold, - UnsafeInMemorySorter inMemorySorter) throws IOException { + UnsafeInMemorySorter inMemorySorter, + long existingMemoryConsumption) throws IOException { UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */); sorter.spill(Long.MAX_VALUE, sorter); + taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption); + sorter.totalSpillBytes += existingMemoryConsumption; // The external sorter will be used to insert records, in-memory sorter is not needed. sorter.inMemSorter = null; return sorter; @@ -217,22 +220,26 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { } if (inMemSorter == null || inMemSorter.numRecords() <= 0) { + // There could still be some memory allocated when there are no records in the in-memory + // sorter. We will not spill it however, to ensure that we can always process at least one + // record before spilling. See the comments in `allocateMemoryForRecordIfNecessary` for why + // this is necessary. return 0L; } logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), Utils.bytesToString(getMemoryUsage()), - spillWriters.size() , + spillWriters.size(), spillWriters.size() > 1 ? " times" : " time"); ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Sorting records or not will be handled by different spill writer, here null is given instead. spillWithWriter(null, inMemSorter.numRecords(), writeMetrics, false); final long spillSize = freeMemory(); - inMemSorter.reset(); // Note that this is more-or-less going to be a multiple of the page size, so wasted space in // pages will currently be counted as memory spilled even though that space isn't actually // written to disk. This also counts the space needed to store the sorter's pointer array. + inMemSorter.freeMemory(); // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the // records. Otherwise, if the task is over allocated memory, then without freeing the memory // pages, we might not be able to get memory for the pointer array. @@ -262,14 +269,9 @@ public SpillWriterForUnsafeSorter spillWithWriter( taskContext.taskMetrics(), spillToPMemEnabled, isSorted); - if (spillWriter == null) { - logger.error("failed to get SpillWriter, please check related configuration."); - return null; - } else { - spillWriter.write(); - spillWriters.add(spillWriter); - return spillWriter; - } + spillWriter.write(); + spillWriters.add(spillWriter); + return spillWriter; } /** @@ -343,7 +345,6 @@ private long freeMemory() { return memoryFreed; } - private void freeSpills() { for (SpillWriterForUnsafeSorter spillWriter : spillWriters) { spillWriter.clearAll(); @@ -359,7 +360,7 @@ public void cleanupResources() { freeSpills(); freeMemory(); if (inMemSorter != null) { - inMemSorter.free(); + inMemSorter.freeMemory(); inMemSorter = null; } } @@ -373,40 +374,53 @@ public void cleanupResources() { private void growPointerArrayIfNecessary() throws IOException { assert(inMemSorter != null); if (!inMemSorter.hasSpaceForAnotherRecord()) { + if (inMemSorter.numRecords() <= 0) { + // Spilling was triggered just before this method was called. The pointer array was freed + // during the spill, so a new pointer array needs to be allocated here. + LongArray array = allocateArray(inMemSorter.getInitialSize()); + inMemSorter.expandPointerArray(array); + return; + } + long used = inMemSorter.getMemoryUsage(); - LongArray array; + LongArray array = null; try { // could trigger spilling array = allocateArray(used / 8 * 2); } catch (TooLargePageException e) { // The pointer array is too big to fix in a single page, spill. spill(); - return; } catch (SparkOutOfMemoryError e) { - // should have trigger spilling - if (!inMemSorter.hasSpaceForAnotherRecord()) { + if (inMemSorter.numRecords() > 0) { logger.error("Unable to grow the pointer array"); throw e; } - return; + // The new array could not be allocated, but that is not an issue as it is longer needed, + // as all records were spilled. } - // check if spilling is triggered or not - if (inMemSorter.hasSpaceForAnotherRecord()) { - freeArray(array); - } else { - inMemSorter.expandPointerArray(array); + + if (inMemSorter.numRecords() <= 0) { + // Spilling was triggered while trying to allocate the new array. + if (array != null) { + // We succeeded in allocating the new array, but, since all records were spilled, a + // smaller array would also suffice. + freeArray(array); + } + // The pointer array was freed during the spill, so a new pointer array needs to be + // allocated here. + array = allocateArray(inMemSorter.getInitialSize()); } + inMemSorter.expandPointerArray(array); } } /** - * Allocates more memory in order to insert an additional record. This will request additional - * memory from the memory manager and spill if the requested memory can not be obtained. + * Allocates an additional page in order to insert an additional record. This will request + * additional memory from the memory manager and spill if the requested memory can not be + * obtained. * * @param required the required space in the data page, in bytes, including space for storing - * the record size. This must be less than or equal to the page size (records - * that exceed the page size are handled via a different code path which uses - * special overflow pages). + * the record size. */ private void acquireNewPageIfNecessary(int required) { if (currentPage == null || @@ -418,6 +432,37 @@ private void acquireNewPageIfNecessary(int required) { } } + /** + * Allocates more memory in order to insert an additional record. This will request additional + * memory from the memory manager and spill if the requested memory can not be obtained. + * + * @param required the required space in the data page, in bytes, including space for storing + * the record size. + */ + private void allocateMemoryForRecordIfNecessary(int required) throws IOException { + // Step 1: + // Ensure that the pointer array has space for another record. This may cause a spill. + growPointerArrayIfNecessary(); + // Step 2: + // Ensure that the last page has space for another record. This may cause a spill. + acquireNewPageIfNecessary(required); + // Step 3: + // The allocation in step 2 could have caused a spill, which would have freed the pointer + // array allocated in step 1. Therefore we need to check again whether we have to allocate + // a new pointer array. + // + // If the allocation in this step causes a spill event then it will not cause the page + // allocated in the previous step to be freed. The function `spill` only frees memory if at + // least one record has been inserted in the in-memory sorter. This will not be the case if + // we have spilled in the previous step. + // + // If we did not spill in the previous step then `growPointerArrayIfNecessary` will be a + // no-op that does not allocate any memory, and therefore can't cause a spill event. + // + // Thus there is no need to call `acquireNewPageIfNecessary` again after this step. + growPointerArrayIfNecessary(); + } + /** * Write a record to the sorter. */ @@ -432,11 +477,10 @@ public void insertRecord( spill(); } - growPointerArrayIfNecessary(); - int uaoSize = UnsafeAlignedOffset.getUaoSize(); + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); // Need 4 or 8 bytes to store the record length. final int required = length + uaoSize; - acquireNewPageIfNecessary(required); + allocateMemoryForRecordIfNecessary(required); final Object base = currentPage.getBaseObject(); final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); @@ -459,10 +503,9 @@ public void insertKVRecord(Object keyBase, long keyOffset, int keyLen, Object valueBase, long valueOffset, int valueLen, long prefix, boolean prefixIsNull) throws IOException { - growPointerArrayIfNecessary(); - int uaoSize = UnsafeAlignedOffset.getUaoSize(); + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); final int required = keyLen + valueLen + (2 * uaoSize); - acquireNewPageIfNecessary(required); + allocateMemoryForRecordIfNecessary(required); final Object base = currentPage.getBaseObject(); final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); @@ -483,6 +526,7 @@ public void insertKVRecord(Object keyBase, long keyOffset, int keyLen, */ public void merge(UnsafeExternalSorter other) throws IOException { other.spill(); + totalSpillBytes += other.totalSpillBytes; spillWriters.addAll(other.spillWriters); // remove them from `spillWriters`, or the files will be deleted in `cleanupResources`. other.spillWriters.clear(); @@ -523,10 +567,13 @@ public UnsafeSorterIterator getSortedIterator() throws IOException { */ class SpillableIterator extends UnsafeSorterIterator { private UnsafeSorterIterator upstream; - private UnsafeSorterIterator nextUpstream = null; private MemoryBlock lastPage = null; private boolean loaded = false; - private int numRecords = 0; + private int numRecords; + private Object currentBaseObject; + private long currentBaseOffset; + private int currentRecordLength; + private long currentKeyPrefix; SpillableIterator(UnsafeSorterIterator inMemIterator) { this.upstream = inMemIterator; @@ -538,27 +585,33 @@ public int getNumRecords() { return numRecords; } + @Override + public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); + } + public long spill() throws IOException { synchronized (this) { - if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null - && numRecords > 0)) { + if (inMemSorter == null) { return 0L; } - UnsafeInMemorySorter.SortedIterator inMemIterator = + + long currentPageNumber = upstream.getCurrentPageNumber(); + + UnsafeInMemorySorter.SortedIterator inMemIterator = ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); - + ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); long released = 0L; SpillWriterForUnsafeSorter spillWriter = spillWithWriter(inMemIterator, numRecords, writeMetrics, true); - nextUpstream = spillWriter.getSpillReader(); - assert(nextUpstream != null); + upstream = spillWriter.getSpillReader(); + synchronized (UnsafeExternalSorter.this) { // release the pages except the one that is used. There can still be a caller that // is accessing the current record. We free this page in that caller's next loadNext() // call. for (MemoryBlock page : allocatedPages) { - if (!loaded || page.pageNumber != - ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) { + if (!loaded || page.pageNumber != currentPageNumber) { released += page.size(); freePage(page); } else { @@ -566,13 +619,18 @@ public long spill() throws IOException { } } allocatedPages.clear(); + if (lastPage != null) { + // Add the last page back to the list of allocated pages to make sure it gets freed in + // case loadNext() never gets called again. + allocatedPages.add(lastPage); + } } // in-memory sorter will not be used after spilling assert(inMemSorter != null); released += inMemSorter.getMemoryUsage(); totalSortTimeNanos += inMemSorter.getSortTimeNanos(); - inMemSorter.free(); + inMemSorter.freeMemory(); inMemSorter = null; taskContext.taskMetrics().incMemoryBytesSpilled(released); taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten()); @@ -588,26 +646,32 @@ public boolean hasNext() { @Override public void loadNext() throws IOException { + assert upstream != null; MemoryBlock pageToFree = null; try { synchronized (this) { loaded = true; - if (nextUpstream != null) { - // Just consumed the last record from in memory iterator - if(lastPage != null) { - // Do not free the page here, while we are locking `SpillableIterator`. The `freePage` - // method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in - // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and - // `SpillableIterator` in sequence, which may happen in - // `TaskMemoryManager.acquireExecutionMemory`. - pageToFree = lastPage; - lastPage = null; - } - upstream = nextUpstream; - nextUpstream = null; + // Just consumed the last record from the in-memory iterator. + if (lastPage != null) { + // Do not free the page here, while we are locking `SpillableIterator`. The `freePage` + // method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in + // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and + // `SpillableIterator` in sequence, which may happen in + // `TaskMemoryManager.acquireExecutionMemory`. + pageToFree = lastPage; + allocatedPages.clear(); + lastPage = null; } numRecords--; upstream.loadNext(); + + // Keep track of the current base object, base offset, record length, and key prefix, + // so that the current record can still be read in case a spill is triggered and we + // switch to the spill writer's iterator. + currentBaseObject = upstream.getBaseObject(); + currentBaseOffset = upstream.getBaseOffset(); + currentRecordLength = upstream.getRecordLength(); + currentKeyPrefix = upstream.getKeyPrefix(); } } finally { if (pageToFree != null) { @@ -618,22 +682,22 @@ public void loadNext() throws IOException { @Override public Object getBaseObject() { - return upstream.getBaseObject(); + return currentBaseObject; } @Override public long getBaseOffset() { - return upstream.getBaseOffset(); + return currentBaseOffset; } @Override public int getRecordLength() { - return upstream.getRecordLength(); + return currentRecordLength; } @Override public long getKeyPrefix() { - return upstream.getKeyPrefix(); + return currentKeyPrefix; } } @@ -663,7 +727,7 @@ public UnsafeSorterIterator getIterator(int startIndex) throws IOException { } i += spillWriter.recordsSpilled(); } - if (inMemSorter != null) { + if (inMemSorter != null && inMemSorter.numRecords() > 0) { UnsafeSorterIterator iter = inMemSorter.getSortedIterator(); moveOver(iter, startIndex - i); queue.add(iter); @@ -709,6 +773,11 @@ static class ChainedIterator extends UnsafeSorterIterator { public int getNumRecords() { return numRecords; } + + @Override + public long getCurrentPageNumber() { + return current.getCurrentPageNumber(); + } @Override public boolean hasNext() { From 485cc282f267337c8c53175c48bac71a30b4c833 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:23:57 +0800 Subject: [PATCH 18/35] Update UnsafeInMemorySorter.java --- .../unsafe/sort/UnsafeInMemorySorter.java | 51 +++++++------------ 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index b1c1d393..575aeea0 100644 --- a/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -159,6 +159,10 @@ private int getUsableCapacity() { return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5)); } + public long getInitialSize() { + return initialSize; + } + /** * Free the memory used by pointer array. */ @@ -168,47 +172,19 @@ public void freeWithoutLongArray() { } } - public void free() { + public void freeMemory() { if (consumer != null) { if (array != null) { consumer.freeArray(array); } - array = null; - } - } - public void resetWithoutLongArrray() { - if (consumer != null) { - // the call to consumer.allocateArray may trigger a spill which in turn access this instance - // and eventually re-enter this method and try to free the array again. by setting the array - // to null and its length to 0 we effectively make the spill code-path a no-op. setting the - // array to null also indicates that it has already been de-allocated which prevents a double - // de-allocation in free(). + // Set the array to null instead of allocating a new array. Allocating an array could have + // triggered another spill and this method already is called from UnsafeExternalSorter when + // spilling. Attempting to allocate while spilling is dangerous, as we could be holding onto + // a large partially complete allocation, which may prevent other memory from being allocated. + // Instead we will allocate the new array when it is necessary. array = null; usableCapacity = 0; - pos = 0; - nullBoundaryPos = 0; - array = consumer.allocateArray(initialSize); - usableCapacity = getUsableCapacity(); - } - pos = 0; - nullBoundaryPos = 0; - } - - public void reset() { - if (consumer != null) { - consumer.freeArray(array); - // the call to consumer.allocateArray may trigger a spill which in turn access this instance - // and eventually re-enter this method and try to free the array again. by setting the array - // to null and its length to 0 we effectively make the spill code-path a no-op. setting the - // array to null also indicates that it has already been de-allocated which prevents a double - // de-allocation in free(). - array = null; - usableCapacity = 0; - pos = 0; - nullBoundaryPos = 0; - array = consumer.allocateArray(initialSize); - usableCapacity = getUsableCapacity(); } pos = 0; nullBoundaryPos = 0; @@ -250,6 +226,7 @@ public boolean hasSpaceForAnotherRecord() { } public void expandPointerArray(LongArray newArray) { + if (array != null) { if (newArray.size() < array.size()) { // checkstyle.off: RegexpSinglelineJava throw new SparkOutOfMemoryError("Not enough memory to grow pointer array"); @@ -262,6 +239,7 @@ public void expandPointerArray(LongArray newArray) { newArray.getBaseOffset(), pos * 8L); consumer.freeArray(array); + } array = newArray; usableCapacity = getUsableCapacity(); } @@ -365,6 +343,7 @@ public void loadNext() { @Override public long getBaseOffset() { return baseOffset; } + @Override public long getCurrentPageNumber() { return currentPageNumber; } @@ -387,6 +366,10 @@ public long getCurrentPageNumber() { * {@code next()} will return the same mutable object. */ public UnsafeSorterIterator getSortedIterator() { + if (numRecords() == 0) { + // `array` might be null, so make sure that it is not accessed by returning early. + return new SortedIterator(0, 0); + } int offset = 0; long start = System.nanoTime(); if (sortComparator != null) { From 400ca0172346b238a5219cd055c1bfb928a15f2e Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:24:16 +0800 Subject: [PATCH 19/35] Update UnsafeSorterPMemSpillWriter.java From 68ddd0d2fac8638326ce5246b12b9ee6ed7dc17a Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:24:52 +0800 Subject: [PATCH 20/35] Update UnsafeSorterPMemSpillWriter.java From facb72e0a8578ccb7d8d82977d8d7dd5cb6bdaa1 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:25:19 +0800 Subject: [PATCH 21/35] Update UnsafeSorterSpillReader.java --- .../util/collection/unsafe/sort/UnsafeSorterSpillReader.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 25ca11fb..a7ed81b2 100644 --- a/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -97,6 +97,11 @@ public int getNumRecords() { return numRecords; } + @Override + public long getCurrentPageNumber() { + throw new UnsupportedOperationException(); + } + @Override public boolean hasNext() { return (numRecordsRemaining > 0); From c33de3c71c28ac0c07570dcc4b5e66a66a572318 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:25:36 +0800 Subject: [PATCH 22/35] Update UnsafeSorterSpillWriter.java From 3d2b3652e92e9135b3612bfa09903d72af6760b6 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:25:56 +0800 Subject: [PATCH 23/35] Update UnsafeSorterStreamSpillReader.java From 1834c8d30198716bb50bfc30bd3a43d87169d4ad Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:26:15 +0800 Subject: [PATCH 24/35] Update UnsafeSorterStreamSpillWriter.java From e6f9698d978c234f3765b4210aaf1daddb6fde80 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:26:39 +0800 Subject: [PATCH 25/35] Update package.scala --- .../spark/internal/config/package.scala | 333 +++++++++++++++--- 1 file changed, 290 insertions(+), 43 deletions(-) diff --git a/src/main/scala/org/apache/spark/internal/config/package.scala b/src/main/scala/org/apache/spark/internal/config/package.scala index 07661590..8cf62cf4 100644 --- a/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/src/main/scala/org/apache/spark/internal/config/package.scala @@ -17,6 +17,7 @@ package org.apache.spark.internal +import java.util.Locale import java.util.concurrent.TimeUnit import org.apache.spark.launcher.SparkLauncher @@ -271,6 +272,13 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("0") + private[spark] val EXECUTOR_METRICS_FILESYSTEM_SCHEMES = + ConfigBuilder("spark.executor.metrics.fileSystemSchemes") + .doc("The file system schemes to report in executor metrics.") + .version("3.1.0") + .stringConf + .createWithDefaultString("file,hdfs") + private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS) .withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS) @@ -302,8 +310,8 @@ package object config { .createWithDefaultString("1g") private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead") - .doc("The amount of non-heap memory to be allocated per executor in cluster mode, " + - "in MiB unless otherwise specified.") + .doc("The amount of non-heap memory to be allocated per executor, in MiB unless otherwise" + + " specified.") .version("2.3.0") .bytesConf(ByteUnit.MiB) .createOptional @@ -337,11 +345,31 @@ package object config { .checkValue(_ >= 0, "The off-heap memory size must not be negative") .createWithDefault(0) + private[spark] val MEMORY_STORAGE_FRACTION = ConfigBuilder("spark.memory.storageFraction") + .doc("Amount of storage memory immune to eviction, expressed as a fraction of the " + + "size of the region set aside by spark.memory.fraction. The higher this is, the " + + "less working memory may be available to execution and tasks may spill to disk more " + + "often. Leaving this at the default value is recommended. ") + .version("1.6.0") + .doubleConf + .checkValue(v => v >= 0.0 && v < 1.0, "Storage fraction must be in [0,1)") + .createWithDefault(0.5) + + private[spark] val MEMORY_FRACTION = ConfigBuilder("spark.memory.fraction") + .doc("Fraction of (heap space - 300MB) used for execution and storage. The " + + "lower this is, the more frequently spills and cached data eviction occur. " + + "The purpose of this config is to set aside memory for internal metadata, " + + "user data structures, and imprecise size estimation in the case of sparse, " + + "unusually large records. Leaving this at the default value is recommended. ") + .version("1.6.0") + .doubleConf + .createWithDefault(0.6) + val MEMORY_SPILL_PMEM_ENABLED = ConfigBuilder("spark.memory.spill.pmem.enabled") .doc("Set memory spill to PMem instead of disk.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val MEMORY_EXTENDED_PATH = ConfigBuilder("spark.memory.extended.path") @@ -388,26 +416,6 @@ package object config { s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") .createWithDefaultString("8m") - private[spark] val MEMORY_STORAGE_FRACTION = ConfigBuilder("spark.memory.storageFraction") - .doc("Amount of storage memory immune to eviction, expressed as a fraction of the " + - "size of the region set aside by spark.memory.fraction. The higher this is, the " + - "less working memory may be available to execution and tasks may spill to disk more " + - "often. Leaving this at the default value is recommended. ") - .version("1.6.0") - .doubleConf - .checkValue(v => v >= 0.0 && v < 1.0, "Storage fraction must be in [0,1)") - .createWithDefault(0.5) - - private[spark] val MEMORY_FRACTION = ConfigBuilder("spark.memory.fraction") - .doc("Fraction of (heap space - 300MB) used for execution and storage. The " + - "lower this is, the more frequently spills and cached data eviction occur. " + - "The purpose of this config is to set aside memory for internal metadata, " + - "user data structures, and imprecise size estimation in the case of sparse, " + - "unusually large records. Leaving this at the default value is recommended. ") - .version("1.6.0") - .doubleConf - .createWithDefault(0.6) - private[spark] val STORAGE_SAFETY_FRACTION = ConfigBuilder("spark.storage.safetyFraction") .version("1.1.0") .doubleConf @@ -464,6 +472,67 @@ package object config { .intConf .createWithDefault(1) + private[spark] val STORAGE_DECOMMISSION_ENABLED = + ConfigBuilder("spark.storage.decommission.enabled") + .doc("Whether to decommission the block manager when decommissioning executor") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED = + ConfigBuilder("spark.storage.decommission.shuffleBlocks.enabled") + .doc("Whether to transfer shuffle blocks during block manager decommissioning. Requires " + + "a migratable shuffle resolver (like sort based shuffle)") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_THREADS = + ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxThreads") + .doc("Maximum number of threads to use in migrating shuffle files.") + .version("3.1.0") + .intConf + .checkValue(_ > 0, "The maximum number of threads should be positive") + .createWithDefault(8) + + private[spark] val STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED = + ConfigBuilder("spark.storage.decommission.rddBlocks.enabled") + .doc("Whether to transfer RDD blocks during block manager decommissioning.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = + ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") + .internal() + .doc("Maximum number of failures which can be handled for the replication of " + + "one RDD block when block manager is decommissioning and trying to move its " + + "existing blocks.") + .version("3.1.0") + .intConf + .createWithDefault(3) + + private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL = + ConfigBuilder("spark.storage.decommission.replicationReattemptInterval") + .internal() + .doc("The interval of time between consecutive cache block replication reattempts " + + "happening on each decommissioning executor (due to storage decommissioning).") + .version("3.1.0") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(_ > 0, "Time interval between two consecutive attempts of " + + "cache block replication should be positive.") + .createWithDefaultString("30s") + + private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH = + ConfigBuilder("spark.storage.decommission.fallbackStorage.path") + .doc("The location for fallback storage during block manager decommissioning. " + + "For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " + + "The storage should be managed by TTL because Spark will not clean it up.") + .version("3.1.0") + .stringConf + .checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.") + .createOptional + private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = ConfigBuilder("spark.storage.replication.topologyFile") .version("2.1.0") @@ -488,6 +557,13 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString) + private[spark] val STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT = + ConfigBuilder("spark.storage.blockManagerHeartbeatTimeoutMs") + .version("0.7.0") + .withAlternative("spark.storage.blockManagerSlaveTimeoutMs") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + private[spark] val STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT = ConfigBuilder("spark.storage.cleanupFilesAfterExecutorExit") .doc("Whether or not cleanup the files not served by the external shuffle service " + @@ -595,6 +671,16 @@ package object config { .version("1.2.0") .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) + private[spark] val LEGACY_LOCALITY_WAIT_RESET = + ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch") + .doc("Whether to use the legacy behavior of locality wait, which resets the delay timer " + + "anytime a task is scheduled. See Delay Scheduling section of TaskSchedulerImpl's class " + + "documentation for more details.") + .internal() + .version("3.1.0") + .booleanConf + .createWithDefault(false) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") .version("0.5.0") .timeConf(TimeUnit.MILLISECONDS) @@ -711,6 +797,13 @@ package object config { .booleanConf .createWithDefault(true) + private[spark] val EXCLUDE_ON_FAILURE_ENABLED = + ConfigBuilder("spark.excludeOnFailure.enabled") + .version("3.1.0") + .withAlternative("spark.blacklist.enabled") + .booleanConf + .createOptional + // Blacklist confs private[spark] val BLACKLIST_ENABLED = ConfigBuilder("spark.blacklist.enabled") @@ -719,38 +812,44 @@ package object config { .createOptional private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR = - ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor") - .version("2.1.0") + ConfigBuilder("spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor") + .version("3.1.0") + .withAlternative("spark.blacklist.task.maxTaskAttemptsPerExecutor") .intConf .createWithDefault(1) private[spark] val MAX_TASK_ATTEMPTS_PER_NODE = - ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode") - .version("2.1.0") + ConfigBuilder("spark.excludeOnFailure.task.maxTaskAttemptsPerNode") + .version("3.1.0") + .withAlternative("spark.blacklist.task.maxTaskAttemptsPerNode") .intConf .createWithDefault(2) private[spark] val MAX_FAILURES_PER_EXEC = - ConfigBuilder("spark.blacklist.application.maxFailedTasksPerExecutor") - .version("2.2.0") + ConfigBuilder("spark.excludeOnFailure.application.maxFailedTasksPerExecutor") + .version("3.1.0") + .withAlternative("spark.blacklist.application.maxFailedTasksPerExecutor") .intConf .createWithDefault(2) private[spark] val MAX_FAILURES_PER_EXEC_STAGE = - ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor") - .version("2.1.0") + ConfigBuilder("spark.excludeOnFailure.stage.maxFailedTasksPerExecutor") + .version("3.1.0") + .withAlternative("spark.blacklist.stage.maxFailedTasksPerExecutor") .intConf .createWithDefault(2) private[spark] val MAX_FAILED_EXEC_PER_NODE = - ConfigBuilder("spark.blacklist.application.maxFailedExecutorsPerNode") - .version("2.2.0") + ConfigBuilder("spark.excludeOnFailure.application.maxFailedExecutorsPerNode") + .version("3.1.0") + .withAlternative("spark.blacklist.application.maxFailedExecutorsPerNode") .intConf .createWithDefault(2) private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE = - ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode") - .version("2.1.0") + ConfigBuilder("spark.excludeOnFailure.stage.maxFailedExecutorsPerNode") + .version("3.1.0") + .withAlternative("spark.blacklist.stage.maxFailedExecutorsPerNode") .intConf .createWithDefault(2) @@ -778,7 +877,35 @@ package object config { .version("2.3.0") .booleanConf .createWithDefault(false) - // End blacklist confs + + private[spark] val EXCLUDE_ON_FAILURE_TIMEOUT_CONF = + ConfigBuilder("spark.excludeOnFailure.timeout") + .version("3.1.0") + .withAlternative("spark.blacklist.timeout") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + + private[spark] val EXCLUDE_ON_FAILURE_KILL_ENABLED = + ConfigBuilder("spark.excludeOnFailure.killExcludedExecutors") + .version("3.1.0") + .withAlternative("spark.blacklist.killBlacklistedExecutors") + .booleanConf + .createWithDefault(false) + + private[spark] val EXCLUDE_ON_FAILURE_LEGACY_TIMEOUT_CONF = + ConfigBuilder("spark.scheduler.executorTaskExcludeOnFailureTime") + .internal() + .version("3.1.0") + .withAlternative("spark.scheduler.executorTaskBlacklistTime") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + + private[spark] val EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED = + ConfigBuilder("spark.excludeOnFailure.application.fetchFailure.enabled") + .version("3.1.0") + .withAlternative("spark.blacklist.application.fetchFailure.enabled") + .booleanConf + .createWithDefault(false) private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE = ConfigBuilder("spark.files.fetchFailure.unRegisterOutputOnHost") @@ -1404,10 +1531,9 @@ package object config { private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED = ConfigBuilder("spark.shuffle.readHostLocalDisk") - .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " + - s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " + - "blocks requested from those block managers which are running on the same host are read " + - "from the disk directly instead of being fetched as remote blocks over the network.") + .doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled, shuffle " + + "blocks requested from those block managers which are running on the same host are " + + "read from the disk directly instead of being fetched as remote blocks over the network.") .version("3.0.0") .booleanConf .createWithDefault(true) @@ -1443,10 +1569,12 @@ package object config { .createWithDefaultString("365d") private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT = - ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout") + ConfigBuilder("spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout") .doc("The timeout in seconds to wait to acquire a new executor and schedule a task " + - "before aborting a TaskSet which is unschedulable because of being completely blacklisted.") - .version("2.4.1") + "before aborting a TaskSet which is unschedulable because all executors are " + + "excluded due to failures.") + .version("3.1.0") + .withAlternative("spark.scheduler.blacklist.unschedulableTaskSetTimeout") .timeConf(TimeUnit.SECONDS) .checkValue(v => v >= 0, "The value should be a non negative time value.") .createWithDefault(120) @@ -1775,6 +1903,16 @@ package object config { .toSequence .createWithDefault(Nil) + private[spark] val ARCHIVES = ConfigBuilder("spark.archives") + .version("3.1.0") + .doc("Comma-separated list of archives to be extracted into the working directory of each " + + "executor. .jar, .tar.gz, .tgz and .zip are supported. You can specify the directory " + + "name to unpack via adding '#' after the file name to unpack, for example, " + + "'file.zip#directory'. This configuration is experimental.") + .stringConf + .toSequence + .createWithDefault(Nil) + private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode") .version("1.5.0") .stringConf @@ -1808,6 +1946,7 @@ package object config { ConfigBuilder("spark.scheduler.mode") .version("0.8.0") .stringConf + .transform(_.toUpperCase(Locale.ROOT)) .createWithDefault(SchedulingMode.FIFO.toString) private[spark] val SCHEDULER_REVIVE_INTERVAL = @@ -1854,6 +1993,30 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val DECOMMISSION_ENABLED = + ConfigBuilder("spark.decommission.enabled") + .doc("When decommission enabled, Spark will try its best to shutdown the executor " + + s"gracefully. Spark will try to migrate all the RDD blocks (controlled by " + + s"${STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key}) and shuffle blocks (controlled by " + + s"${STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key}) from the decommissioning " + + s"executor to a remote executor when ${STORAGE_DECOMMISSION_ENABLED.key} is enabled. " + + s"With decommission enabled, Spark will also decommission an executor instead of " + + s"killing when ${DYN_ALLOCATION_ENABLED.key} enabled.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL = + ConfigBuilder("spark.executor.decommission.killInterval") + .doc("Duration after which a decommissioned executor will be killed forcefully." + + "This config is useful for cloud environments where we know in advance when " + + "an executor is going to go down after decommissioning signal i.e. around 2 mins " + + "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + + "used to decide what tasks running on decommission executors to speculate.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createOptional + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") @@ -1866,4 +2029,88 @@ package object config { .bytesConf(ByteUnit.BYTE) .createOptional + private[spark] val RESOURCE_PROFILE_MERGE_CONFLICTS = + ConfigBuilder("spark.scheduler.resource.profileMergeConflicts") + .doc("If set to true, Spark will merge ResourceProfiles when different profiles " + + "are specified in RDDs that get combined into a single stage. When they are merged, " + + "Spark chooses the maximum of each resource and creates a new ResourceProfile. The " + + "default of false results in Spark throwing an exception if multiple different " + + "ResourceProfiles are found in RDDs going into the same stage.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val STANDALONE_SUBMIT_WAIT_APP_COMPLETION = + ConfigBuilder("spark.standalone.submit.waitAppCompletion") + .doc("In standalone cluster mode, controls whether the client waits to exit until the " + + "application completes. If set to true, the client process will stay alive polling " + + "the driver's status. Otherwise, the client process will exit after submission.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val EXECUTOR_ALLOW_SPARK_CONTEXT = + ConfigBuilder("spark.executor.allowSparkContext") + .doc("If set to true, SparkContext can be created in executors.") + .version("3.0.1") + .booleanConf + .createWithDefault(false) + + private[spark] val EXECUTOR_KILL_ON_FATAL_ERROR_DEPTH = + ConfigBuilder("spark.executor.killOnFatalError.depth") + .doc("The max depth of the exception chain in a failed task Spark will search for a fatal " + + "error to check whether it should kill an executor. 0 means not checking any fatal " + + "error, 1 means checking only the exception but not the cause, and so on.") + .internal() + .version("3.1.0") + .intConf + .checkValue(_ >= 0, "needs to be a non-negative value") + .createWithDefault(5) + + private[spark] val PUSH_BASED_SHUFFLE_ENABLED = + ConfigBuilder("spark.shuffle.push.enabled") + .doc("Set to 'true' to enable push-based shuffle on the client side and this works in " + + "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " + + "which needs to be set with the appropriate " + + "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " + + "shuffle to be enabled") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS = + ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations") + .doc("Maximum number of shuffle push merger locations cached for push based shuffle. " + + "Currently, shuffle push merger locations are nothing but external shuffle services " + + "which are responsible for handling pushed blocks and merging them and serving " + + "merged blocks for later shuffle fetch.") + .version("3.1.0") + .intConf + .createWithDefault(500) + + private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO = + ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio") + .doc("The minimum number of shuffle merger locations required to enable push based " + + "shuffle for a stage. This is specified as a ratio of the number of partitions in " + + "the child stage. For example, a reduce stage which has 100 partitions and uses the " + + "default value 0.05 requires at least 5 unique merger locations to enable push based " + + "shuffle. Merger locations are currently defined as external shuffle services.") + .version("3.1.0") + .doubleConf + .createWithDefault(0.05) + + private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD = + ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold") + .doc(s"The static threshold for number of shuffle push merger locations should be " + + "available in order to enable push based shuffle for a stage. Note this config " + + s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " + + "Maximum of spark.shuffle.push.mergersMinStaticThreshold and " + + s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " + + "enable push based shuffle for a stage. For eg: with 1000 partitions for the child " + + "stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " + + s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " + + "at least 50 mergers to enable push based shuffle for that stage.") + .version("3.1.0") + .doubleConf + .createWithDefault(5) } From d1ac9c0dd2e315c4b0782497f922c4e18b4dee01 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:26:55 +0800 Subject: [PATCH 26/35] Update ExtendedMemoryPool.scala --- src/main/scala/org/apache/spark/memory/ExtendedMemoryPool.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/memory/ExtendedMemoryPool.scala b/src/main/scala/org/apache/spark/memory/ExtendedMemoryPool.scala index 50dc05b8..9ba43483 100644 --- a/src/main/scala/org/apache/spark/memory/ExtendedMemoryPool.scala +++ b/src/main/scala/org/apache/spark/memory/ExtendedMemoryPool.scala @@ -108,4 +108,3 @@ private[memory] class ExtendedMemoryPool(lock: Object) extends MemoryPool(lock) numBytesToFree } } - From 3484c925d6b56076aa574661683739c9aef26b23 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:27:09 +0800 Subject: [PATCH 27/35] Update MemoryManager.scala From ff985190a2cb9bb77dd8b22768f0312853f3d2f6 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:27:21 +0800 Subject: [PATCH 28/35] Update StorageMemoryPool.scala From 8d96ae9a4837538c0c826f04a81bd7e0f4aeec91 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:27:36 +0800 Subject: [PATCH 29/35] Update UnifiedMemoryManager.scala From 4c8864dbd5d60f295b82b36b6b0a98a8b569c6a0 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:28:00 +0800 Subject: [PATCH 30/35] Update MemoryStore.scala --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 36174159..41302557 100644 --- a/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -95,9 +95,11 @@ private[spark] class MemoryStore( // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]() + // Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching // always stores serialized values. private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]() + // Note: pmem unroll memory is only used in putIteratorAsBytes() because off-heap caching // always stores serialized values. private val pmemUnrollMemoryMap = mutable.HashMap[Long, Long]() From 1204cc11ca96883f16237381f18bd0e109afe906 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:28:19 +0800 Subject: [PATCH 31/35] Update BlockManager.scala --- .../apache/spark/storage/BlockManager.scala | 200 +++++++++++++----- 1 file changed, 148 insertions(+), 52 deletions(-) diff --git a/src/main/scala/org/apache/spark/storage/BlockManager.scala b/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5b3379c8..c78ca91f 100644 --- a/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -18,12 +18,13 @@ package org.apache.spark.storage import java.io._ -import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference} +import java.lang.ref.{WeakReference, ReferenceQueue => JReferenceQueue} import java.nio.ByteBuffer import java.nio.channels.Channels import java.util.Collections import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, TimeUnit} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.HashMap import scala.concurrent.{ExecutionContext, Future} @@ -55,7 +56,8 @@ import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} -import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} +import org.apache.spark.shuffle.{MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter} +import org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager, ReplicateBlock} import org.apache.spark.storage.memory._ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ @@ -119,9 +121,7 @@ private[spark] class ByteBufferBlockData( private[spark] class HostLocalDirManager( futureExecutionContext: ExecutionContext, cacheSize: Int, - externalBlockStoreClient: ExternalBlockStoreClient, - host: String, - externalShuffleServicePort: Int) extends Logging { + blockStoreClient: BlockStoreClient) extends Logging { private val executorIdToLocalDirsCache = CacheBuilder @@ -129,24 +129,25 @@ private[spark] class HostLocalDirManager( .maximumSize(cacheSize) .build[String, Array[String]]() - private[spark] def getCachedHostLocalDirs() - : scala.collection.Map[String, Array[String]] = executorIdToLocalDirsCache.synchronized { - import scala.collection.JavaConverters._ - return executorIdToLocalDirsCache.asMap().asScala - } + private[spark] def getCachedHostLocalDirs: Map[String, Array[String]] = + executorIdToLocalDirsCache.synchronized { + executorIdToLocalDirsCache.asMap().asScala.toMap + } private[spark] def getHostLocalDirs( + host: String, + port: Int, executorIds: Array[String])( - callback: Try[java.util.Map[String, Array[String]]] => Unit): Unit = { + callback: Try[Map[String, Array[String]]] => Unit): Unit = { val hostLocalDirsCompletable = new CompletableFuture[java.util.Map[String, Array[String]]] - externalBlockStoreClient.getHostLocalDirs( + blockStoreClient.getHostLocalDirs( host, - externalShuffleServicePort, + port, executorIds, hostLocalDirsCompletable) hostLocalDirsCompletable.whenComplete { (hostLocalDirs, throwable) => if (hostLocalDirs != null) { - callback(Success(hostLocalDirs)) + callback(Success(hostLocalDirs.asScala.toMap)) executorIdToLocalDirsCache.synchronized { executorIdToLocalDirsCache.putAll(hostLocalDirs) } @@ -164,7 +165,7 @@ private[spark] class HostLocalDirManager( * Note that [[initialize()]] must be called before the BlockManager is usable. */ private[spark] class BlockManager( - executorId: String, + val executorId: String, rpcEnv: RpcEnv, val master: BlockManagerMaster, val serializerManager: SerializerManager, @@ -255,7 +256,7 @@ private[spark] class BlockManager( private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory - private val externalShuffleServicePort = StorageUtils.externalShuffleServicePort(conf) + private[spark] val externalShuffleServicePort = StorageUtils.externalShuffleServicePort(conf) var blockManagerId: BlockManagerId = _ @@ -271,9 +272,9 @@ private[spark] class BlockManager( private val maxFailuresBeforeLocationRefresh = conf.get(config.BLOCK_FAILURES_BEFORE_LOCATION_REFRESH) - private val slaveEndpoint = rpcEnv.setupEndpoint( + private val storageEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, - new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker)) + new BlockManagerStorageEndpoint(rpcEnv, this, mapOutputTracker)) // Pending re-registration action being executed asynchronously or null if none is pending. // Accesses should synchronize on asyncReregisterLock. @@ -287,6 +288,10 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ + // visible for test + // This is volatile since if it's defined we should not accept remote blocks. + @volatile private[spark] var decommissioner: Option[BlockManagerDecommissioner] = None + // A DownloadFileManager used to track all the files of remote blocks which are above the // specified memory threshold. Files will be deleted automatically based on weak reference. // Exposed for test @@ -296,6 +301,26 @@ private[spark] class BlockManager( var hostLocalDirManager: Option[HostLocalDirManager] = None + @inline final private def isDecommissioning() = { + decommissioner.isDefined + } + + @inline final private def checkShouldStore(blockId: BlockId) = { + // Don't reject broadcast blocks since they may be stored during task exec and + // don't need to be migrated. + if (isDecommissioning() && !blockId.isBroadcast) { + throw new BlockSavedOnDecommissionedBlockManagerException(blockId) + } + } + + // This is a lazy val so someone can migrating RDDs even if they don't have a MigratableResolver + // for shuffles. Used in BlockManagerDecommissioner & block puts. + private[storage] lazy val migratableResolver: MigratableResolver = { + shuffleManager.shuffleBlockResolver.asInstanceOf[MigratableResolver] + } + + override def getLocalDiskDirs: Array[String] = diskBlockManager.localDirsString + /** * Abstraction for storing blocks from bytes, whether they start in memory or on disk. * @@ -406,7 +431,7 @@ private[spark] class BlockManager( ThreadUtils.awaitReady(replicationFuture, Duration.Inf) } catch { case NonFatal(t) => - throw new Exception("Error occurred while waiting for replication to finish", t) + throw new SparkException("Error occurred while waiting for replication to finish", t) } } if (blockWasSuccessfullyStored) { @@ -508,7 +533,7 @@ private[spark] class BlockManager( diskBlockManager.localDirsString, maxOnHeapMemory, maxOffHeapMemory, - slaveEndpoint) + storageEndpoint) blockManagerId = if (idFromMaster != null) idFromMaster else id @@ -524,20 +549,17 @@ private[spark] class BlockManager( registerWithExternalShuffleServer() } - hostLocalDirManager = + hostLocalDirManager = { if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) && !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { - externalBlockStoreClient.map { blockStoreClient => - new HostLocalDirManager( - futureExecutionContext, - conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), - blockStoreClient, - blockManagerId.host, - externalShuffleServicePort) - } + Some(new HostLocalDirManager( + futureExecutionContext, + conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE), + blockStoreClient)) } else { None } + } logInfo(s"Initialized BlockManager: $blockManagerId") } @@ -586,8 +608,8 @@ private[spark] class BlockManager( * an executor crash. * * This function deliberately fails silently if the master returns false (indicating that - * the slave needs to re-register). The error condition will be detected again by the next - * heart beat attempt or new block registration and another try to re-register all blocks + * the storage endpoint needs to re-register). The error condition will be detected again by the + * next heart beat attempt or new block registration and another try to re-register all blocks * will be made then. */ private def reportAllBlocks(): Unit = { @@ -611,7 +633,7 @@ private[spark] class BlockManager( // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, - maxOffHeapMemory, slaveEndpoint) + maxOffHeapMemory, storageEndpoint) reportAllBlocks() } @@ -660,7 +682,17 @@ private[spark] class BlockManager( */ override def getLocalBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { - shuffleManager.shuffleBlockResolver.getBlockData(blockId) + logDebug(s"Getting local shuffle block ${blockId}") + try { + shuffleManager.shuffleBlockResolver.getBlockData(blockId) + } catch { + case e: IOException => + if (conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) { + FallbackStorage.read(conf, blockId) + } else { + throw e + } + } } else { getLocalBytes(blockId) match { case Some(blockData) => @@ -693,6 +725,21 @@ private[spark] class BlockManager( blockId: BlockId, level: StorageLevel, classTag: ClassTag[_]): StreamCallbackWithID = { + + checkShouldStore(blockId) + + if (blockId.isShuffle) { + logDebug(s"Putting shuffle block ${blockId}") + try { + return migratableResolver.putShuffleBlockAsStream(blockId, serializerManager) + } catch { + case e: ClassCastException => throw new SparkException( + s"Unexpected shuffle block ${blockId} with unsupported shuffle " + + s"resolver ${shuffleManager.shuffleBlockResolver}") + } + } + logDebug(s"Putting regular block ${blockId}") + // All other blocks val (_, tmpFile) = diskBlockManager.createTempLocalBlock() val channel = new CountingWritableChannel( Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile)))) @@ -713,7 +760,11 @@ private[spark] class BlockManager( // stream. channel.close() val blockSize = channel.getCount - TempFileBasedBlockStoreUpdater(blockId, level, classTag, tmpFile, blockSize).save() + val blockStored = TempFileBasedBlockStoreUpdater( + blockId, level, classTag, tmpFile, blockSize).save() + if (!blockStored) { + throw new Exception(s"Failure while trying to store block $blockId on $blockManagerId.") + } } override def onFailure(streamId: String, cause: Throwable): Unit = { @@ -757,9 +808,9 @@ private[spark] class BlockManager( * * droppedMemorySize exists to account for when the block is dropped from memory to disk (so * it is still valid). This ensures that update in master will compensate for the increase in - * memory on slave. + * memory on the storage endpoint. */ - private def reportBlockStatus( + private[spark] def reportBlockStatus( blockId: BlockId, status: BlockStatus, droppedMemorySize: Long = 0L): Unit = { @@ -775,7 +826,7 @@ private[spark] class BlockManager( /** * Actually send a UpdateBlockInfo message. Returns the master's response, * which will be true if the block was successfully recorded and false if - * the slave needs to re-register. + * the storage endpoint needs to re-register. */ private def tryToReportBlockStatus( blockId: BlockId, @@ -973,7 +1024,7 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") // Because all the remote blocks are registered in driver, it is not necessary to ask - // all the slave executors to get block status. + // all the storage endpoints to get block status. val locationsAndStatusOption = master.getLocationsAndStatus(blockId, blockManagerId.host) if (locationsAndStatusOption.isEmpty) { logDebug(s"Block $blockId is unknown by block manager master") @@ -1115,7 +1166,7 @@ private[spark] class BlockManager( blockSize: Long): Option[ManagedBuffer] = { val file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId.name) if (file.exists()) { - val mangedBuffer = securityManager.getIOEncryptionKey() match { + val managedBuffer = securityManager.getIOEncryptionKey() match { case Some(key) => // Encrypted blocks cannot be memory mapped; return a special object that does decryption // and provides InputStream / FileRegion implementations for reading the data. @@ -1126,7 +1177,7 @@ private[spark] class BlockManager( val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") new FileSegmentManagedBuffer(transportConf, file, 0, file.length) } - Some(mangedBuffer) + Some(managedBuffer) } else { None } @@ -1340,6 +1391,7 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") + checkShouldStore(blockId) val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) @@ -1596,7 +1648,7 @@ private[spark] class BlockManager( /** * Get peer block managers in the system. */ - private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { + private[storage] def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { peerFetchLock.synchronized { val cachedPeersTtl = conf.get(config.STORAGE_CACHED_PEERS_TTL) // milliseconds val diff = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastPeerFetchTimeNs) @@ -1606,23 +1658,32 @@ private[spark] class BlockManager( lastPeerFetchTimeNs = System.nanoTime() logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) } - cachedPeers + if (cachedPeers.isEmpty && + conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) { + Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) + } else { + cachedPeers + } } } /** - * Called for pro-active replenishment of blocks lost due to executor failures + * Replicates a block to peer block managers based on existingReplicas and maxReplicas * * @param blockId blockId being replicate * @param existingReplicas existing block managers that have a replica * @param maxReplicas maximum replicas needed + * @param maxReplicationFailures number of replication failures to tolerate before + * giving up. + * @return whether block was successfully replicated or not */ def replicateBlock( blockId: BlockId, existingReplicas: Set[BlockManagerId], - maxReplicas: Int): Unit = { + maxReplicas: Int, + maxReplicationFailures: Option[Int] = None): Boolean = { logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") - blockInfoManager.lockForReading(blockId).foreach { info => + blockInfoManager.lockForReading(blockId).forall { info => val data = doGetLocalBytes(blockId, info) val storageLevel = StorageLevel( useDisk = info.level.useDisk, @@ -1630,11 +1691,13 @@ private[spark] class BlockManager( useOffHeap = info.level.useOffHeap, deserialized = info.level.deserialized, replication = maxReplicas) - // we know we are called as a result of an executor removal, so we refresh peer cache - // this way, we won't try to replicate to a missing executor with a stale reference + // we know we are called as a result of an executor removal or because the current executor + // is getting decommissioned. so we refresh peer cache before trying replication, we won't + // try to replicate to a missing executor/another decommissioning executor getPeers(forceFetch = true) try { - replicate(blockId, data, storageLevel, info.classTag, existingReplicas) + replicate( + blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures) } finally { logDebug(s"Releasing lock for $blockId") releaseLockAndDispose(blockId, data) @@ -1651,9 +1714,11 @@ private[spark] class BlockManager( data: BlockData, level: StorageLevel, classTag: ClassTag[_], - existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { + existingReplicas: Set[BlockManagerId] = Set.empty, + maxReplicationFailures: Option[Int] = None): Boolean = { - val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE) + val maxReplicationFailureCount = maxReplicationFailures.getOrElse( + conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)) val tLevel = StorageLevel( useDisk = level.useDisk, useMemory = level.useMemory, @@ -1677,7 +1742,7 @@ private[spark] class BlockManager( blockId, numPeersToReplicateTo) - while(numFailures <= maxReplicationFailures && + while(numFailures <= maxReplicationFailureCount && !peersForReplication.isEmpty && peersReplicatedTo.size < numPeersToReplicateTo) { val peer = peersForReplication.head @@ -1701,6 +1766,10 @@ private[spark] class BlockManager( peersForReplication = peersForReplication.tail peersReplicatedTo += peer } catch { + // Rethrow interrupt exception + case e: InterruptedException => + throw e + // Everything else we may retry case NonFatal(e) => logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e) peersFailedToReplicateTo += peer @@ -1725,9 +1794,11 @@ private[spark] class BlockManager( if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") + return false } logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") + return true } /** @@ -1842,6 +1913,30 @@ private[spark] class BlockManager( blocksToRemove.size } + def decommissionBlockManager(): Unit = storageEndpoint.ask(DecommissionBlockManager) + + private[spark] def decommissionSelf(): Unit = synchronized { + decommissioner match { + case None => + logInfo("Starting block manager decommissioning process...") + decommissioner = Some(new BlockManagerDecommissioner(conf, this)) + decommissioner.foreach(_.start()) + case Some(_) => + logDebug("Block manager already in decommissioning state") + } + } + + /** + * Returns the last migration time and a boolean denoting if all the blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[spark] def lastMigrationInfo(): (Long, Boolean) = { + decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false)) + } + + private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] = + master.getReplicateInfoForRDDBlocks(blockManagerId) + /** * Remove all blocks belonging to the given broadcast. */ @@ -1911,6 +2006,7 @@ private[spark] class BlockManager( } def stop(): Unit = { + decommissioner.foreach(_.stop()) blockTransferService.close() if (blockStoreClient ne blockTransferService) { // Closing should be idempotent, but maybe not for the NioBlockTransferService. @@ -1918,7 +2014,7 @@ private[spark] class BlockManager( } remoteBlockTempFileManager.stop() diskBlockManager.stop() - rpcEnv.stop(slaveEndpoint) + rpcEnv.stop(storageEndpoint) blockInfoManager.clear() memoryStore.clear() futureExecutionContext.shutdownNow() From 99d75df71e6c55f149b7a7b17aba8dbbe17cb68a Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:28:56 +0800 Subject: [PATCH 32/35] Update PMemBlockObjectWriter.scala From 30f5d5f7d075f50850edcb87968edb13fcf24678 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:29:12 +0800 Subject: [PATCH 33/35] Update StorageLevel.scala --- src/main/scala/org/apache/spark/storage/StorageLevel.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/src/main/scala/org/apache/spark/storage/StorageLevel.scala index f31a9f13..01970ca0 100644 --- a/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -166,6 +166,7 @@ object StorageLevel { val NONE = new StorageLevel(false, false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, false, 2) + val DISK_ONLY_3 = new StorageLevel(true, false, false, false, false, 3) val MEMORY_ONLY = new StorageLevel(false, true, false, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, false) @@ -187,6 +188,7 @@ object StorageLevel { case "NONE" => NONE case "DISK_ONLY" => DISK_ONLY case "DISK_ONLY_2" => DISK_ONLY_2 + case "DISK_ONLY_3" => DISK_ONLY_3 case "MEMORY_ONLY" => MEMORY_ONLY case "MEMORY_ONLY_2" => MEMORY_ONLY_2 case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER From 3b5f6172d093427b217fa85c946cccef8c5f10b0 Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:29:26 +0800 Subject: [PATCH 34/35] Update SparkEnv.scala --- src/main/scala/org/apache/spark/SparkEnv.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/main/scala/org/apache/spark/SparkEnv.scala b/src/main/scala/org/apache/spark/SparkEnv.scala index da83f262..9b90ac9e 100644 --- a/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/src/main/scala/org/apache/spark/SparkEnv.scala @@ -125,9 +125,12 @@ class SparkEnv ( private[spark] def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket): Unit = { - synchronized { - val key = (pythonExec, envVars) - pythonWorkers.get(key).foreach(_.stopWorker(worker)) + def destroyPythonWorker(pythonExec: String, + envVars: Map[String, String], worker: Socket): Unit = { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers.get(key).foreach(_.stopWorker(worker)) + } } } @@ -377,7 +380,8 @@ object SparkEnv extends Logging { externalShuffleClient } else { None - }, blockManagerInfo)), + }, blockManagerInfo, + mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])), registerOrLookupEndpoint( BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME, new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)), @@ -463,7 +467,8 @@ object SparkEnv extends Logging { hadoopConf: Configuration, schedulingMode: String, addedJars: Seq[String], - addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = { + addedFiles: Seq[String], + addedArchives: Seq[String]): Map[String, Seq[(String, String)]] = { import Properties._ val jvmInformation = Seq( @@ -493,7 +498,7 @@ object SparkEnv extends Logging { .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath")) - val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) + val addedJarsAndFiles = (addedJars ++ addedFiles ++ addedArchives).map((_, "Added By User")) val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted // Add Hadoop properties, it will not ignore configs including in Spark. Some spark From 0c983ead75485b85ecb70fd4ae2e605cb1d1da1d Mon Sep 17 00:00:00 2001 From: Pei Qi <37234782+JustDoCoder@users.noreply.github.com> Date: Fri, 16 Apr 2021 09:29:53 +0800 Subject: [PATCH 35/35] Update TestMemoryManager.scala