spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap
Date Fri, 17 Feb 2017 17:35:38 GMT
Repository: spark
Updated Branches:
  refs/heads/master 021062af0 -> 3d0c3af0a


[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap

## What changes were proposed in this pull request?

Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale
factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned
out this is not true, the current implementation of append() could leave 1 more item than
the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail
the assert in 2.2, or fail to insert into InMemorySorter in 2.1).

This PR fix the off-by-one bug in BytesToBytesMap.

This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial
capacity), introduced by #15722 .

## How was this patch tested?

Added regression test.

Author: Davies Liu <davies@databricks.com>

Closes #16844 from davies/off_by_one.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d0c3af0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d0c3af0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d0c3af0

Branch: refs/heads/master
Commit: 3d0c3af0a76757c20e429c38efa4f14a15c9097a
Parents: 021062a
Author: Davies Liu <davies@databricks.com>
Authored: Fri Feb 17 09:38:06 2017 -0800
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Fri Feb 17 09:38:06 2017 -0800

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       |  5 ++-
 .../UnsafeFixedWidthAggregationMapSuite.scala   | 40 ++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d0c3af0/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 44120e5..4bef21b 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -698,7 +698,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
       if (numKeys == MAX_CAPACITY
         // The map could be reused from last spill (because of no enough memory to grow),
         // then we don't try to grow again if hit the `growthThreshold`.
-        || !canGrowArray && numKeys > growthThreshold) {
+        || !canGrowArray && numKeys >= growthThreshold) {
         return false;
       }
 
@@ -742,7 +742,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
         longArray.set(pos * 2 + 1, keyHashcode);
         isDefined = true;
 
-        if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
+        if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY)
{
           try {
             growAndRehash();
           } catch (OutOfMemoryError oom) {
@@ -911,6 +911,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
       freePage(dataPage);
     }
     allocate(initialCapacity);
+    canGrowArray = true;
     currentPage = null;
     pageCursor = 0;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3d0c3af0/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index c155511..6cf18de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -342,4 +342,44 @@ class UnsafeFixedWidthAggregationMapSuite
     }
   }
 
+  testWithMemoryLeakDetection("convert to external sorter after fail to grow (SPARK-19500)")
{
+    val pageSize = 4096000
+    val map = new UnsafeFixedWidthAggregationMap(
+      emptyAggregationBuffer,
+      aggBufferSchema,
+      groupKeySchema,
+      taskMemoryManager,
+      128, // initial capacity
+      pageSize,
+      false // disable perf metrics
+    )
+
+    val rand = new Random(42)
+    for (i <- 1 to 63) {
+      val str = rand.nextString(1024)
+      val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+      buf.setInt(0, str.length)
+    }
+    // Simulate running out of space
+    memoryManager.limit(0)
+    var str = rand.nextString(1024)
+    var buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+    assert(buf != null)
+    str = rand.nextString(1024)
+    buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+    assert(buf == null)
+
+    // Convert the map into a sorter. This used to fail before the fix for SPARK-10474
+    // because we would try to acquire space for the in-memory sorter pointer array before
+    // actually releasing the pages despite having spilled all of them.
+    var sorter: UnsafeKVExternalSorter = null
+    try {
+      sorter = map.destructAndCreateExternalSorter()
+      map.free()
+    } finally {
+      if (sorter != null) {
+        sorter.cleanupResources()
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message