spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-9548][SQL] Add a destructive iterator for BytesToBytesMap
Date Thu, 06 Aug 2015 21:33:51 GMT
Repository: spark
Updated Branches:
  refs/heads/master abfedb9cd -> 21fdfd7d6


[SPARK-9548][SQL] Add a destructive iterator for BytesToBytesMap

This pull request adds a destructive iterator to BytesToBytesMap. When used, the iterator
frees pages as it traverses them. This is part of the effort to avoid starving when we have
more than one operators that can exhaust memory.

This is based on #7924, but fixes a bug there (Don't use destructive iterator in UnsafeKVExternalSorter).

Closes #7924.

Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Reynold Xin <rxin@databricks.com>

Closes #8003 from rxin/map-destructive-iterator and squashes the following commits:

6b618c3 [Reynold Xin] Don't use destructive iterator in UnsafeKVExternalSorter.
a7bd8ec [Reynold Xin] Merge remote-tracking branch 'viirya/destructive_iter' into map-destructive-iterator
7652083 [Liang-Chi Hsieh] For comments: add destructiveIterator(), modify unit test, remove
code block.
4a3e9de [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter
581e9e3 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter
f0ff783 [Liang-Chi Hsieh] No need to free last page.
9e9d2a3 [Liang-Chi Hsieh] Add a destructive iterator for BytesToBytesMap.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21fdfd7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21fdfd7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21fdfd7d

Branch: refs/heads/master
Commit: 21fdfd7d6f89adbd37066c169e6ba9ccd337683e
Parents: abfedb9
Author: Liang-Chi Hsieh <viirya@appier.com>
Authored: Thu Aug 6 14:33:29 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Aug 6 14:33:29 2015 -0700

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       | 33 +++++++++++++++--
 .../map/AbstractBytesToBytesMapSuite.java       | 37 +++++++++++++++++---
 .../UnsafeFixedWidthAggregationMap.java         |  7 ++--
 .../sql/execution/UnsafeKVExternalSorter.java   |  5 ++-
 4 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/21fdfd7d/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 2034743..5ac3736 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -227,22 +227,35 @@ public final class BytesToBytesMap {
     private final Iterator<MemoryBlock> dataPagesIterator;
     private final Location loc;
 
-    private MemoryBlock currentPage;
+    private MemoryBlock currentPage = null;
     private int currentRecordNumber = 0;
     private Object pageBaseObject;
     private long offsetInPage;
 
+    // If this iterator destructive or not. When it is true, it frees each page as it moves
onto
+    // next one.
+    private boolean destructive = false;
+    private BytesToBytesMap bmap;
+
     private BytesToBytesMapIterator(
-        int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
+        int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc,
+        boolean destructive, BytesToBytesMap bmap) {
       this.numRecords = numRecords;
       this.dataPagesIterator = dataPagesIterator;
       this.loc = loc;
+      this.destructive = destructive;
+      this.bmap = bmap;
       if (dataPagesIterator.hasNext()) {
         advanceToNextPage();
       }
     }
 
     private void advanceToNextPage() {
+      if (destructive && currentPage != null) {
+        dataPagesIterator.remove();
+        this.bmap.taskMemoryManager.freePage(currentPage);
+        this.bmap.shuffleMemoryManager.release(currentPage.size());
+      }
       currentPage = dataPagesIterator.next();
       pageBaseObject = currentPage.getBaseObject();
       offsetInPage = currentPage.getBaseOffset();
@@ -281,7 +294,21 @@ public final class BytesToBytesMap {
    * `lookup()`, the behavior of the returned iterator is undefined.
    */
   public BytesToBytesMapIterator iterator() {
-    return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc);
+    return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, false, this);
+  }
+
+  /**
+   * Returns a destructive iterator for iterating over the entries of this map. It frees
each page
+   * as it moves onto next one. Notice: it is illegal to call any method on the map after
+   * `destructiveIterator()` has been called.
+   *
+   * For efficiency, all calls to `next()` will return the same {@link Location} object.
+   *
+   * If any other lookups or operations are performed on this map while iterating over it,
including
+   * `lookup()`, the behavior of the returned iterator is undefined.
+   */
+  public BytesToBytesMapIterator destructiveIterator() {
+    return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, true, this);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/21fdfd7d/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 0e23a64..3c50033 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -183,8 +183,7 @@ public abstract class AbstractBytesToBytesMapSuite {
     }
   }
 
-  @Test
-  public void iteratorTest() throws Exception {
+  private void iteratorTestBase(boolean destructive) throws Exception {
     final int size = 4096;
     BytesToBytesMap map = new BytesToBytesMap(
       taskMemoryManager, shuffleMemoryManager, size / 2, PAGE_SIZE_BYTES);
@@ -216,7 +215,14 @@ public abstract class AbstractBytesToBytesMapSuite {
         }
       }
       final java.util.BitSet valuesSeen = new java.util.BitSet(size);
-      final Iterator<BytesToBytesMap.Location> iter = map.iterator();
+      final Iterator<BytesToBytesMap.Location> iter;
+      if (destructive) {
+        iter = map.destructiveIterator();
+      } else {
+        iter = map.iterator();
+      }
+      int numPages = map.getNumDataPages();
+      int countFreedPages = 0;
       while (iter.hasNext()) {
         final BytesToBytesMap.Location loc = iter.next();
         Assert.assertTrue(loc.isDefined());
@@ -228,11 +234,22 @@ public abstract class AbstractBytesToBytesMapSuite {
         if (keyLength == 0) {
           Assert.assertTrue("value " + value + " was not divisible by 5", value % 5 == 0);
         } else {
-        final long key = PlatformDependent.UNSAFE.getLong(
-          keyAddress.getBaseObject(), keyAddress.getBaseOffset());
+          final long key = PlatformDependent.UNSAFE.getLong(
+            keyAddress.getBaseObject(), keyAddress.getBaseOffset());
           Assert.assertEquals(value, key);
         }
         valuesSeen.set((int) value);
+        if (destructive) {
+          // The iterator moves onto next page and frees previous page
+          if (map.getNumDataPages() < numPages) {
+            numPages = map.getNumDataPages();
+            countFreedPages++;
+          }
+        }
+      }
+      if (destructive) {
+        // Latest page is not freed by iterator but by map itself
+        Assert.assertEquals(countFreedPages, numPages - 1);
       }
       Assert.assertEquals(size, valuesSeen.cardinality());
     } finally {
@@ -241,6 +258,16 @@ public abstract class AbstractBytesToBytesMapSuite {
   }
 
   @Test
+  public void iteratorTest() throws Exception {
+    iteratorTestBase(false);
+  }
+
+  @Test
+  public void destructiveIteratorTest() throws Exception {
+    iteratorTestBase(true);
+  }
+
+  @Test
   public void iteratingOverDataPagesWithWastedSpace() throws Exception {
     final int NUM_ENTRIES = 1000 * 1000;
     final int KEY_LENGTH = 24;

http://git-wip-us.apache.org/repos/asf/spark/blob/21fdfd7d/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 0245803..efb3353 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -154,14 +154,17 @@ public final class UnsafeFixedWidthAggregationMap {
   }
 
   /**
-   * Returns an iterator over the keys and values in this map.
+   * Returns an iterator over the keys and values in this map. This uses destructive iterator
of
+   * BytesToBytesMap. So it is illegal to call any other method on this map after `iterator()`
has
+   * been called.
    *
    * For efficiency, each call returns the same object.
    */
   public KVIterator<UnsafeRow, UnsafeRow> iterator() {
     return new KVIterator<UnsafeRow, UnsafeRow>() {
 
-      private final BytesToBytesMap.BytesToBytesMapIterator mapLocationIterator = map.iterator();
+      private final BytesToBytesMap.BytesToBytesMapIterator mapLocationIterator =
+        map.destructiveIterator();
       private final UnsafeRow key = new UnsafeRow();
       private final UnsafeRow value = new UnsafeRow();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/21fdfd7d/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 6c1cf13..9a65c9d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -88,8 +88,11 @@ public final class UnsafeKVExternalSorter {
       final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
         taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
 
-      final int numKeyFields = keySchema.size();
+      // We cannot use the destructive iterator here because we are reusing the existing
memory
+      // pages in BytesToBytesMap to hold records during sorting.
+      // The only new memory we are allocating is the pointer/prefix array.
       BytesToBytesMap.BytesToBytesMapIterator iter = map.iterator();
+      final int numKeyFields = keySchema.size();
       UnsafeRow row = new UnsafeRow();
       while (iter.hasNext()) {
         final BytesToBytesMap.Location loc = iter.next();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message