spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail
Date Sun, 11 Feb 2018 16:15:26 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 1b4c6ab44 -> 169483455


[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail

This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple
times.

When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter`
to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same,
so no data movement is required. However, both the sorter and the map need a point array for
some bookkeeping work.

There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter
and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization,
the length of the `BytesToBytesMap` point array is at least 4 times larger than the number
of keys(to avoid hash collision, the hash table size should be at least 2 times larger than
the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer
array size to be 4 times of the number of entries, so we are safe to reuse the point array.

However, the number of keys of the map doesn't equal to the number of entries in the map,
because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above
optimization and we may run out of space when inserting data into the sorter, and hit error
```
java.lang.IllegalStateException: There is no space for new record
   at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149)
...
```

This PR fixes this bug by creating a new point array if the existing one is not big enough.

a new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20561 from cloud-fan/bug.

(cherry picked from commit 4bbd7443ebb005f81ed6bc39849940ac8db3b3cc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16948345
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16948345
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16948345

Branch: refs/heads/branch-2.2
Commit: 169483455730e05b26f2e848072caefd90f52097
Parents: 1b4c6ab
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Mon Feb 12 00:03:49 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Mon Feb 12 00:15:14 2018 +0800

----------------------------------------------------------------------
 .../sql/execution/UnsafeKVExternalSorter.java   | 31 ++++++++++++----
 .../execution/UnsafeKVExternalSorterSuite.scala | 39 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16948345/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index ee5bcfd..7d67b87 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -33,6 +33,7 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.storage.BlockManager;
 import org.apache.spark.unsafe.KVIterator;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.map.BytesToBytesMap;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.collection.unsafe.sort.*;
@@ -96,15 +97,29 @@ public final class UnsafeKVExternalSorter {
         numElementsForSpillThreshold,
         canUseRadixSort);
     } else {
-      // The array will be used to do in-place sort, which require half of the space to be
empty.
-      // Note: each record in the map takes two entries in the array, one is record pointer,
-      // another is the key prefix.
-      assert(map.numKeys() * 2 <= map.getArray().size() / 2);
-      // During spilling, the array in map will not be used, so we can borrow that and use
it
-      // as the underlying array for in-memory sorter (it's always large enough).
-      // Since we will not grow the array, it's fine to pass `null` as consumer.
+      // During spilling, the pointer array in `BytesToBytesMap` will not be used, so we
can borrow
+      // that and use it as the pointer array for `UnsafeInMemorySorter`.
+      LongArray pointerArray = map.getArray();
+      // `BytesToBytesMap`'s pointer array is only guaranteed to hold all the distinct keys,
but
+      // `UnsafeInMemorySorter`'s pointer array need to hold all the entries. Since
+      // `BytesToBytesMap` can have duplicated keys, here we need a check to make sure the
pointer
+      // array can hold all the entries in `BytesToBytesMap`.
+      // The pointer array will be used to do in-place sort, which requires half of the space
to be
+      // empty. Note: each record in the map takes two entries in the pointer array, one
is record
+      // pointer, another is key prefix. So the required size of pointer array is `numRecords
* 4`.
+      // TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with
same key,
+      // so that we can always reuse the pointer array.
+      if (map.numValues() > pointerArray.size() / 4) {
+        // Here we ask the map to allocate memory, so that the memory manager won't ask the
map
+        // to spill, if the memory is not enough.
+        pointerArray = map.allocateArray(map.numValues() * 4L);
+      }
+
+      // Since the pointer array(either reuse the one in the map, or create a new one) is
guaranteed
+      // to be large enough, it's fine to pass `null` as consumer because we won't allocate
more
+      // memory.
       final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
-        null, taskMemoryManager, recordComparator, prefixComparator, map.getArray(),
+        null, taskMemoryManager, recordComparator, prefixComparator, pointerArray,
         canUseRadixSort);
 
       // We cannot use the destructive iterator here because we are reusing the existing
memory

http://git-wip-us.apache.org/repos/asf/spark/blob/16948345/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 3d869c7..57ef468 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection,
UnsafeRow}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.map.BytesToBytesMap
 import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 /**
@@ -204,4 +205,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext
{
       spill = true
     )
   }
+
+  test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated
keys") {
+    val memoryManager = new TestMemoryManager(new SparkConf())
+    val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
+    val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes())
+
+    // Key/value are a unsafe rows with a single int column
+    val schema = new StructType().add("i", IntegerType)
+    val key = new UnsafeRow(1)
+    key.pointTo(new Array[Byte](32), 32)
+    key.setInt(0, 1)
+    val value = new UnsafeRow(1)
+    value.pointTo(new Array[Byte](32), 32)
+    value.setInt(0, 2)
+
+    for (_ <- 1 to 65) {
+      val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
+      loc.append(
+        key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
+        value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
+    }
+
+    // Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
+    // which has duplicated keys and the number of entries exceeds its capacity.
+    try {
+      TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, null,
null))
+      new UnsafeKVExternalSorter(
+        schema,
+        schema,
+        sparkContext.env.blockManager,
+        sparkContext.env.serializerManager,
+        taskMemoryManager.pageSizeBytes(),
+        Int.MaxValue,
+        map)
+    } finally {
+      TaskContext.unset()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message