Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C44F2200B29 for ; Thu, 30 Jun 2016 19:53:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C2C71160A52; Thu, 30 Jun 2016 17:53:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9ABE7160A06 for ; Thu, 30 Jun 2016 19:53:28 +0200 (CEST) Received: (qmail 93268 invoked by uid 500); 30 Jun 2016 17:53:22 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 93259 invoked by uid 99); 30 Jun 2016 17:53:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jun 2016 17:53:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ABEE1DFF03; Thu, 30 Jun 2016 17:53:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: davies@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: =?utf-8?q?spark_git_commit=3A_=5BSPARK-13850=5D_Force_the_sorter_t?= =?utf-8?q?o_Spill_when_number_of_elements_in_th=E2=80=A6?= Date: Thu, 30 Jun 2016 17:53:22 +0000 (UTC) archived-at: Thu, 30 Jun 2016 17:53:29 -0000 Repository: spark Updated Branches: refs/heads/master 5344bade8 -> 07f46afc7 [SPARK-13850] Force the sorter to Spill when number of elements in th… ## What changes were proposed in this pull request? Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size. ## How was this patch tested? Tested by running a job which was failing without this change due to TimSort bug. Author: Sital Kedia Closes #13107 from sitalkedia/fix_TimSort. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f46afc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f46afc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f46afc Branch: refs/heads/master Commit: 07f46afc733b1718d528a6ea5c0d774f047024fa Parents: 5344bad Author: Sital Kedia Authored: Thu Jun 30 10:53:18 2016 -0700 Committer: Davies Liu Committed: Thu Jun 30 10:53:18 2016 -0700 ---------------------------------------------------------------------- .../shuffle/sort/ShuffleExternalSorter.java | 10 ++++++--- .../unsafe/sort/UnsafeExternalSorter.java | 23 +++++++++++++++++--- .../unsafe/sort/UnsafeExternalSorterSuite.java | 3 +++ .../sql/execution/UnsafeExternalRowSorter.java | 2 ++ .../UnsafeFixedWidthAggregationMap.java | 3 +++ .../sql/execution/UnsafeKVExternalSorter.java | 8 +++++-- .../apache/spark/sql/execution/WindowExec.scala | 2 ++ .../execution/datasources/WriterContainer.scala | 5 ++++- .../execution/joins/CartesianProductExec.scala | 2 ++ .../execution/streaming/FileStreamSink.scala | 5 ++++- .../execution/UnsafeKVExternalSorterSuite.scala | 4 +++- .../spark/sql/hive/hiveWriterContainers.scala | 5 ++++- 12 files changed, 60 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 014aef8..696ee73 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer { private final TaskContext taskContext; private final ShuffleWriteMetrics writeMetrics; - /** Force this sorter to spill when there are this many elements in memory. For testing only */ + /** + * Force this sorter to spill when there are this many elements in memory. The default value is + * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to be 8G. + */ private final long numElementsForSpillThreshold; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ @@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer { // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.numElementsForSpillThreshold = - conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE); + conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024); this.writeMetrics = writeMetrics; this.inMemSorter = new ShuffleInMemorySorter( this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true)); @@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer { // for tests assert(inMemSorter != null); - if (inMemSorter.numRecords() > numElementsForSpillThreshold) { + if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { + logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); } http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index ec15f0b..d6a255e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.memory.MemoryConsumer; @@ -60,6 +61,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final int fileBufferSizeBytes; /** + * Force this sorter to spill when there are this many elements in memory. The default value is + * 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array to be 8G. + */ + public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 1024 * 1024 / 2; + + private final long numElementsForSpillThreshold; + /** * Memory pages that hold the records being sorted. The pages in this list are freed when * spilling, although in principle we could recycle these pages across spills (on the other hand, * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager @@ -88,9 +96,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, + long numElementsForSpillThreshold, UnsafeInMemorySorter inMemorySorter) throws IOException { UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager, - serializerManager, taskContext, recordComparator, prefixComparator, initialSize, + serializerManager, taskContext, recordComparator, prefixComparator, initialSize, numElementsForSpillThreshold, pageSizeBytes, inMemorySorter, false /* ignored */); sorter.spill(Long.MAX_VALUE, sorter); // The external sorter will be used to insert records, in-memory sorter is not needed. @@ -107,9 +116,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, + long numElementsForSpillThreshold, boolean canUseRadixSort) { return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager, - taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null, + taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, null, canUseRadixSort); } @@ -122,6 +132,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, + long numElementsForSpillThreshold, @Nullable UnsafeInMemorySorter existingInMemorySorter, boolean canUseRadixSort) { super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode()); @@ -143,6 +154,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { this.inMemSorter = existingInMemorySorter; } this.peakMemoryUsedBytes = getMemoryUsage(); + this.numElementsForSpillThreshold = numElementsForSpillThreshold; // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator @@ -373,6 +385,12 @@ public final class UnsafeExternalSorter extends MemoryConsumer { Object recordBase, long recordOffset, int length, long prefix, boolean prefixIsNull) throws IOException { + assert(inMemSorter != null); + if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { + logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); + spill(); + } + growPointerArrayIfNecessary(); // Need 4 bytes to store the record length. final int required = length + 4; @@ -384,7 +402,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer { pageCursor += 4; Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); pageCursor += length; - assert(inMemSorter != null); inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull); } http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index bce958c..3ea9923 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -176,6 +176,7 @@ public class UnsafeExternalSorterSuite { prefixComparator, /* initialSize */ 1024, pageSizeBytes, + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD, shouldUseRadixSort()); } @@ -399,6 +400,7 @@ public class UnsafeExternalSorterSuite { null, /* initialSize */ 1024, pageSizeBytes, + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD, shouldUseRadixSort()); long[] record = new long[100]; int recordSize = record.length * 8; @@ -435,6 +437,7 @@ public class UnsafeExternalSorterSuite { prefixComparator, 1024, pageSizeBytes, + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD, shouldUseRadixSort()); // Peak memory should be monotonically increasing. More specifically, every time http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 0b177ad..b4e87c3 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -89,6 +89,8 @@ public final class UnsafeExternalRowSorter { sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize", DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, + SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter + .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), canUseRadixSort ); } http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index 1f1b538..3705291 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.KVIterator; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.map.BytesToBytesMap; +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter; /** * Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width. @@ -246,6 +247,8 @@ public final class UnsafeFixedWidthAggregationMap { SparkEnv.get().blockManager(), SparkEnv.get().serializerManager(), map.getPageSizeBytes(), + SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold", UnsafeExternalSorter + .DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), map); } } http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index b1cc523..daa948f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -55,8 +55,9 @@ public final class UnsafeKVExternalSorter { StructType valueSchema, BlockManager blockManager, SerializerManager serializerManager, - long pageSizeBytes) throws IOException { - this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, null); + long pageSizeBytes, + long numElementsForSpillThreshold) throws IOException { + this(keySchema, valueSchema, blockManager, serializerManager, pageSizeBytes, numElementsForSpillThreshold, null); } public UnsafeKVExternalSorter( @@ -65,6 +66,7 @@ public final class UnsafeKVExternalSorter { BlockManager blockManager, SerializerManager serializerManager, long pageSizeBytes, + long numElementsForSpillThreshold, @Nullable BytesToBytesMap map) throws IOException { this.keySchema = keySchema; this.valueSchema = valueSchema; @@ -90,6 +92,7 @@ public final class UnsafeKVExternalSorter { SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, + numElementsForSpillThreshold, canUseRadixSort); } else { // The array will be used to do in-place sort, which require half of the space to be empty. @@ -136,6 +139,7 @@ public final class UnsafeKVExternalSorter { SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, + numElementsForSpillThreshold, inMemSorter); // reset the map, so we can re-use it to insert new records. the inMemSorter will not used http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala index 1b9634c..93f007f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala @@ -345,6 +345,8 @@ case class WindowExec( null, 1024, SparkEnv.get.memoryManager.pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), false) rows.foreach { r => sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false) http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index f56b50a..9a0b46c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** A container for all the details required when writing to a table. */ @@ -389,7 +390,9 @@ private[sql] class DynamicPartitionWriterContainer( StructType.fromAttributes(dataColumns), SparkEnv.get.blockManager, SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) + TaskContext.get().taskMemoryManager().pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) while (iterator.hasNext) { val currentRow = iterator.next() http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index d870d91..0553086 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -49,6 +49,8 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField null, 1024, SparkEnv.get.memoryManager.pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD), false) val partition = split.asInstanceOf[CartesianPartition] http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index efb0491..117d667 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter object FileStreamSink { // The name of the subdirectory that is used to store metadata about which files are valid. @@ -209,7 +210,9 @@ class FileStreamSinkWriter( StructType.fromAttributes(writeColumns), SparkEnv.get.blockManager, SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) + TaskContext.get().taskMemoryManager().pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) while (iterator.hasNext) { val currentRow = iterator.next() http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 03d4be8..3d869c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** * Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data. @@ -123,7 +124,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { metricsSystem = null)) val sorter = new UnsafeKVExternalSorter( - keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, pageSize) + keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager, + pageSize, UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD) // Insert the keys and values into the sorter inputData.foreach { case (k, v) => http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 794fe26..e65c24e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableJobConf +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** * Internal helper class that saves an RDD using a Hive OutputFormat. @@ -280,7 +281,9 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( StructType.fromAttributes(dataOutput), SparkEnv.get.blockManager, SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes) + TaskContext.get().taskMemoryManager().pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) while (iterator.hasNext) { val inputRow = iterator.next() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org