spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-14363] Fix executor OOM due to memory leak in the Sorter
Date Tue, 12 Apr 2016 23:12:49 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 582ed8a6e -> 413d0600e


[SPARK-14363] Fix executor OOM due to memory leak in the Sorter

Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does
not free up the underlying pointer array. As a result, we see a lot of executor OOM and also
memory under utilization.
This is a regression partially introduced in PR https://github.com/apache/spark/pull/9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <skedia@fb.com>

Closes #12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7dea9540d26b7800de4eb79863ef5f574bf)
Signed-off-by: Davies Liu <davies.liu@gmail.com>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/413d0600
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/413d0600
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/413d0600

Branch: refs/heads/branch-1.6
Commit: 413d0600ed61990e657c97d50d1431a8cd1ab0ed
Parents: 582ed8a
Author: Sital Kedia <skedia@fb.com>
Authored: Tue Apr 12 16:10:07 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Tue Apr 12 16:12:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/shuffle/sort/ShuffleExternalSorter.java  | 6 ++++--
 .../org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java  | 7 +++++++
 .../util/collection/unsafe/sort/UnsafeExternalSorter.java     | 7 +++++--
 .../util/collection/unsafe/sort/UnsafeInMemorySorter.java     | 7 +++++++
 4 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/413d0600/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 52032cf..22348c0 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -215,8 +215,6 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       }
     }
 
-    inMemSorter.reset();
-
     if (!isLastFile) {  // i.e. this is a spill file
       // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when
records
       // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
@@ -255,6 +253,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 
     writeSortedFile(false);
     final long spillSize = freeMemory();
+    inMemSorter.reset();
+    // Reset the in-memory sorter's pointer array only after freeing up the memory pages
holding the
+    // records. Otherwise, if the task is over allocated memory, then without freeing the
memory pages,
+    // we might not be able to get memory for the pointer array.
     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
     return spillSize;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/413d0600/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index d74602c..1afa719 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -49,9 +49,12 @@ final class ShuffleInMemorySorter {
    */
   private int pos = 0;
 
+  private int initialSize;
+
   public ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
     this.consumer = consumer;
     assert (initialSize > 0);
+    this.initialSize = initialSize;
     this.array = consumer.allocateArray(initialSize);
     this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
   }
@@ -68,6 +71,10 @@ final class ShuffleInMemorySorter {
   }
 
   public void reset() {
+    if (consumer != null) {
+      consumer.freeArray(array);
+      this.array = consumer.allocateArray(initialSize);
+    }
     pos = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/413d0600/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 1b84e98..de38c2d 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -192,14 +192,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
         spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
       }
       spillWriter.close();
-
-      inMemSorter.reset();
     }
 
     final long spillSize = freeMemory();
     // Note that this is more-or-less going to be a multiple of the page size, so wasted
space in
     // pages will currently be counted as memory spilled even though that space isn't actually
     // written to disk. This also counts the space needed to store the sorter's pointer array.
+    inMemSorter.reset();
+    // Reset the in-memory sorter's pointer array only after freeing up the memory pages
holding the
+    // records. Otherwise, if the task is over allocated memory, then without freeing the
memory pages,
+    // we might not be able to get memory for the pointer array.
+
     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
 
     return spillSize;

http://git-wip-us.apache.org/repos/asf/spark/blob/413d0600/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 308db22..05390c8 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -80,6 +80,8 @@ public final class UnsafeInMemorySorter {
    */
   private int pos = 0;
 
+  private long initialSize;
+
   public UnsafeInMemorySorter(
     final MemoryConsumer consumer,
     final TaskMemoryManager memoryManager,
@@ -98,6 +100,7 @@ public final class UnsafeInMemorySorter {
       LongArray array) {
     this.consumer = consumer;
     this.memoryManager = memoryManager;
+    this.initialSize = array.size();
     this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
     this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
     this.array = array;
@@ -114,6 +117,10 @@ public final class UnsafeInMemorySorter {
   }
 
   public void reset() {
+    if (consumer != null) {
+      consumer.freeArray(array);
+      this.array = consumer.allocateArray(initialSize);
+    }
     pos = 0;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message