spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [2/2] spark git commit: [SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort
Date Thu, 14 May 2015 00:07:37 GMT
[SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort

This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records.

The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf.

The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles.

UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold:

 - The shuffle dependency specifies no aggregation or output ordering.
 - The shuffle serializer supports relocation of serialized values (this is currently supported
   by KryoSerializer and Spark SQL's custom serializers).
 - The shuffle produces fewer than 16777216 output partitions.
 - No individual record is larger than 128 MB when serialized.

In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer.

At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager.  In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file.

UnsafeShuffleManager optimizes this process in several ways:

 - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization.  See SPARK-4550, where this optimization was first proposed and implemented, for more details.

 - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache.

 - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge.

 - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition.  This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge.

The shuffle read path is unchanged.

This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / #4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in #5725).

### Future work

There are several tasks that build upon this patch, which will be left to future work:

- [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data.
- Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL).

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>

Closes #5868 from JoshRosen/unsafe-sort and squashes the following commits:

ef0a86e [Josh Rosen] Fix scalastyle errors
7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data.
d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances.
52a9981 [Josh Rosen] Fix some bugs in the address packing code.
51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort
4023fa4 [Josh Rosen] Add @Private annotation to some Java classes.
de40b9d [Josh Rosen] More comments to try to explain metrics code
df07699 [Josh Rosen] Attempt to clarify confusing metrics update code
5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file.
d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID
e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter
4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array'
6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter.
57312c9 [Josh Rosen] Clarify fileBufferSize units
2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter.
fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer.
85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator.
0ad34da [Josh Rosen] Fix off-by-one in nextInt() call
56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter
e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS.
e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding.
4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics.
d4e6d89 [Josh Rosen] Update to bit shifting constants
69d5899 [Josh Rosen] Remove some unnecessary override vals
8531286 [Josh Rosen] Add tests that automatically trigger spills.
7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap().
e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections
39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!)
1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class.
ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable.
ae538dc [Josh Rosen] Document UnsafeShuffleManager.
ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions.
0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass.
b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance.
1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations.
b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless.
f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation.
4a01c45 [Josh Rosen] Remove unnecessary log message
27b18b0 [Josh Rosen] That for inserting records AT the max record size.
fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes.
9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change
fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's
67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager
8f5061a [Josh Rosen] Strengthen assertion to check partitioning
01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite
1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename.
e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors
7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling.
722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests.
9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
b95e642 [Josh Rosen] Refactor and document logic that decides when to spill.
1ce1300 [Josh Rosen] More minor cleanup
5e8cf75 [Josh Rosen] More minor cleanup
e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface.
cfe0ec4 [Josh Rosen] Address a number of minor review comments:
8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter
11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics.
b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort
aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter.
4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests.
133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter.
f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort.
57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort.
69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode.
7ee918e [Josh Rosen] Re-order imports in tests
3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces
3490512 [Josh Rosen] Misc. cleanup
f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces.
2776aca [Josh Rosen] First passing test for ExternalSorter.
5e100b2 [Josh Rosen] Super-messy WIP on external sort
595923a [Josh Rosen] Remove some unused variables.
8958584 [Josh Rosen] Fix bug in calculating free space in current page.
f17fa8f [Josh Rosen] Add missing newline
c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use:
b8a09fe [Josh Rosen] Back out accidental log4j.properties change
bfc12d3 [Josh Rosen] Add tests for serializer relocation property.
240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert()
1433b42 [Josh Rosen] Store record length as int instead of long.
026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter
0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java.
87e721b [Josh Rosen] Renaming and comments
d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation
e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite
9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter.
253f13e [Josh Rosen] More cleanup
8e3ec20 [Josh Rosen] Begin code cleanup.
4d2f5e1 [Josh Rosen] WIP
3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter
767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter.
e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter
57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter
abf7bfe [Josh Rosen] Add basic test case.
81d52c5 [Josh Rosen] WIP on UnsafeSorter


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73bed408
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73bed408
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73bed408

Branch: refs/heads/master
Commit: 73bed408fbb47dfc28063afa3898c27fbdec7735
Parents: 61d1e87
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Wed May 13 17:07:31 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed May 13 17:07:31 2015 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |  10 +
 .../shuffle/unsafe/DummySerializerInstance.java |  93 ++++
 .../shuffle/unsafe/PackedRecordPointer.java     |  92 ++++
 .../apache/spark/shuffle/unsafe/SpillInfo.java  |  37 ++
 .../unsafe/UnsafeShuffleExternalSorter.java     | 422 +++++++++++++++
 .../unsafe/UnsafeShuffleInMemorySorter.java     | 124 +++++
 .../unsafe/UnsafeShuffleSortDataFormat.java     |  67 +++
 .../shuffle/unsafe/UnsafeShuffleWriter.java     | 438 +++++++++++++++
 .../spark/storage/TimeTrackingOutputStream.java |  75 +++
 .../main/scala/org/apache/spark/SparkEnv.scala  |   3 +-
 .../spark/serializer/JavaSerializer.scala       |   2 +
 .../shuffle/FileShuffleBlockResolver.scala      |   2 +-
 .../apache/spark/shuffle/ShuffleWriter.scala    |   7 +-
 .../spark/shuffle/hash/HashShuffleWriter.scala  |   2 +-
 .../spark/shuffle/sort/SortShuffleManager.scala |   2 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala  |   2 +-
 .../shuffle/unsafe/UnsafeShuffleManager.scala   | 205 ++++++++
 .../spark/storage/BlockObjectWriter.scala       |  24 +-
 .../util/collection/ExternalAppendOnlyMap.scala |   2 +-
 .../spark/util/collection/ExternalSorter.scala  |   2 +-
 .../unsafe/PackedRecordPointerSuite.java        | 101 ++++
 .../UnsafeShuffleInMemorySorterSuite.java       | 132 +++++
 .../unsafe/UnsafeShuffleWriterSuite.java        | 527 +++++++++++++++++++
 .../apache/spark/io/CompressionCodecSuite.scala |  44 ++
 .../spark/serializer/JavaSerializerSuite.scala  |  29 +
 .../unsafe/UnsafeShuffleManagerSuite.scala      | 128 +++++
 .../shuffle/unsafe/UnsafeShuffleSuite.scala     | 105 ++++
 pom.xml                                         |  14 +-
 project/MimaExcludes.scala                      |   6 +
 .../apache/spark/sql/execution/Exchange.scala   |  28 +-
 unsafe/pom.xml                                  |   4 +
 .../spark/unsafe/memory/TaskMemoryManager.java  |  79 ++-
 .../unsafe/memory/TaskMemoryManagerSuite.java   |  23 +
 33 files changed, 2767 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 262a332..bfa49d0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -362,6 +362,16 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>com.novocode</groupId>
       <artifactId>junit-interface</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java
new file mode 100644
index 0000000..3f746b8
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/DummySerializerInstance.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.unsafe;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import scala.reflect.ClassTag;
+
+import org.apache.spark.serializer.DeserializationStream;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.unsafe.PlatformDependent;
+
+/**
+ * Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
+ * Our shuffle write path doesn't actually use this serializer (since we end up calling the
+ * `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
+ * around this, we pass a dummy no-op serializer.
+ */
+final class DummySerializerInstance extends SerializerInstance {
+
+  public static final DummySerializerInstance INSTANCE = new DummySerializerInstance();
+
+  private DummySerializerInstance() { }
+
+  @Override
+  public SerializationStream serializeStream(final OutputStream s) {
+    return new SerializationStream() {
+      @Override
+      public void flush() {
+        // Need to implement this because DiskObjectWriter uses it to flush the compression stream
+        try {
+          s.flush();
+        } catch (IOException e) {
+          PlatformDependent.throwException(e);
+        }
+      }
+
+      @Override
+      public <T> SerializationStream writeObject(T t, ClassTag<T> ev1) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void close() {
+        // Need to implement this because DiskObjectWriter uses it to close the compression stream
+        try {
+          s.close();
+        } catch (IOException e) {
+          PlatformDependent.throwException(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public <T> ByteBuffer serialize(T t, ClassTag<T> ev1) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public DeserializationStream deserializeStream(InputStream s) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> ev1) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T deserialize(ByteBuffer bytes, ClassTag<T> ev1) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
new file mode 100644
index 0000000..4ee6a82
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.unsafe;
+
+/**
+ * Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
+ * <p>
+ * Within the long, the data is laid out as follows:
+ * <pre>
+ *   [24 bit partition number][13 bit memory page number][27 bit offset in page]
+ * </pre>
+ * This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
+ * our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
+ * 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
+ * implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
+ * <p>
+ * Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
+ * optimization to future work as it will require more careful design to ensure that addresses are
+ * properly aligned (e.g. by padding records).
+ */
+final class PackedRecordPointer {
+
+  static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27;  // 128 megabytes
+
+  /**
+   * The maximum partition identifier that can be encoded. Note that partition ids start from 0.
+   */
+  static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1;  // 16777215
+
+  /** Bit mask for the lower 40 bits of a long. */
+  private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
+
+  /** Bit mask for the upper 24 bits of a long */
+  private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
+
+  /** Bit mask for the lower 27 bits of a long. */
+  private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;
+
+  /** Bit mask for the lower 51 bits of a long. */
+  private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
+
+  /** Bit mask for the upper 13 bits of a long */
+  private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
+
+  /**
+   * Pack a record address and partition id into a single word.
+   *
+   * @param recordPointer a record pointer encoded by TaskMemoryManager.
+   * @param partitionId a shuffle partition id (maximum value of 2^24).
+   * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
+   */
+  public static long packPointer(long recordPointer, int partitionId) {
+    assert (partitionId <= MAXIMUM_PARTITION_ID);
+    // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
+    // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
+    final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
+    final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
+    return (((long) partitionId) << 40) | compressedAddress;
+  }
+
+  private long packedRecordPointer;
+
+  public void set(long packedRecordPointer) {
+    this.packedRecordPointer = packedRecordPointer;
+  }
+
+  public int getPartitionId() {
+    return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
+  }
+
+  public long getRecordPointer() {
+    final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
+    final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
+    return pageNumber | offsetInPage;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
new file mode 100644
index 0000000..7bac0dc
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.unsafe;
+
+import java.io.File;
+
+import org.apache.spark.storage.TempShuffleBlockId;
+
+/**
+ * Metadata for a block of data written by {@link UnsafeShuffleExternalSorter}.
+ */
+final class SpillInfo {
+  final long[] partitionLengths;
+  final File file;
+  final TempShuffleBlockId blockId;
+
+  public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
+    this.partitionLengths = new long[numPartitions];
+    this.file = file;
+    this.blockId = blockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
new file mode 100644
index 0000000..9e9ed94
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.unsafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+
+import scala.Tuple2;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.ShuffleMemoryManager;
+import org.apache.spark.storage.*;
+import org.apache.spark.unsafe.PlatformDependent;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
+import org.apache.spark.util.Utils;
+
+/**
+ * An external sorter that is specialized for sort-based shuffle.
+ * <p>
+ * Incoming records are appended to data pages. When all records have been inserted (or when the
+ * current thread's shuffle memory limit is reached), the in-memory records are sorted according to
+ * their partition ids (using a {@link UnsafeShuffleInMemorySorter}). The sorted records are then
+ * written to a single output file (or multiple files, if we've spilled). The format of the output
+ * files is the same as the format of the final output file written by
+ * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
+ * written as a single serialized, compressed stream that can be read with a new decompression and
+ * deserialization stream.
+ * <p>
+ * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its
+ * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
+ * specialized merge procedure that avoids extra serialization/deserialization.
+ */
+final class UnsafeShuffleExternalSorter {
+
+  private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
+
+  private static final int PAGE_SIZE = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
+  @VisibleForTesting
+  static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
+  @VisibleForTesting
+  static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
+
+  private final int initialSize;
+  private final int numPartitions;
+  private final TaskMemoryManager memoryManager;
+  private final ShuffleMemoryManager shuffleMemoryManager;
+  private final BlockManager blockManager;
+  private final TaskContext taskContext;
+  private final ShuffleWriteMetrics writeMetrics;
+
+  /** The buffer size to use when writing spills using DiskBlockObjectWriter */
+  private final int fileBufferSizeBytes;
+
+  /**
+   * Memory pages that hold the records being sorted. The pages in this list are freed when
+   * spilling, although in principle we could recycle these pages across spills (on the other hand,
+   * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
+   * itself).
+   */
+  private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>();
+
+  private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
+
+  // These variables are reset after spilling:
+  private UnsafeShuffleInMemorySorter sorter;
+  private MemoryBlock currentPage = null;
+  private long currentPagePosition = -1;
+  private long freeSpaceInCurrentPage = 0;
+
+  public UnsafeShuffleExternalSorter(
+      TaskMemoryManager memoryManager,
+      ShuffleMemoryManager shuffleMemoryManager,
+      BlockManager blockManager,
+      TaskContext taskContext,
+      int initialSize,
+      int numPartitions,
+      SparkConf conf,
+      ShuffleWriteMetrics writeMetrics) throws IOException {
+    this.memoryManager = memoryManager;
+    this.shuffleMemoryManager = shuffleMemoryManager;
+    this.blockManager = blockManager;
+    this.taskContext = taskContext;
+    this.initialSize = initialSize;
+    this.numPartitions = numPartitions;
+    // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
+    this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+
+    this.writeMetrics = writeMetrics;
+    initializeForWriting();
+  }
+
+  /**
+   * Allocates new sort data structures. Called when creating the sorter and after each spill.
+   */
+  private void initializeForWriting() throws IOException {
+    // TODO: move this sizing calculation logic into a static method of sorter:
+    final long memoryRequested = initialSize * 8L;
+    final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
+    if (memoryAcquired != memoryRequested) {
+      shuffleMemoryManager.release(memoryAcquired);
+      throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
+    }
+
+    this.sorter = new UnsafeShuffleInMemorySorter(initialSize);
+  }
+
+  /**
+   * Sorts the in-memory records and writes the sorted records to an on-disk file.
+   * This method does not free the sort data structures.
+   *
+   * @param isLastFile if true, this indicates that we're writing the final output file and that the
+   *                   bytes written should be counted towards shuffle spill metrics rather than
+   *                   shuffle write metrics.
+   */
+  private void writeSortedFile(boolean isLastFile) throws IOException {
+
+    final ShuffleWriteMetrics writeMetricsToUse;
+
+    if (isLastFile) {
+      // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
+      writeMetricsToUse = writeMetrics;
+    } else {
+      // We're spilling, so bytes written should be counted towards spill rather than write.
+      // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
+      // them towards shuffle bytes written.
+      writeMetricsToUse = new ShuffleWriteMetrics();
+    }
+
+    // This call performs the actual sort.
+    final UnsafeShuffleInMemorySorter.UnsafeShuffleSorterIterator sortedRecords =
+      sorter.getSortedIterator();
+
+    // Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
+    // after SPARK-5581 is fixed.
+    BlockObjectWriter writer;
+
+    // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
+    // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
+    // data through a byte array. This array does not need to be large enough to hold a single
+    // record;
+    final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+
+    // Because this output will be read during shuffle, its compression codec must be controlled by
+    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
+    // createTempShuffleBlock here; see SPARK-3426 for more details.
+    final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
+      blockManager.diskBlockManager().createTempShuffleBlock();
+    final File file = spilledFileInfo._2();
+    final TempShuffleBlockId blockId = spilledFileInfo._1();
+    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
+
+    // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
+    // Our write path doesn't actually use this serializer (since we end up calling the `write()`
+    // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
+    // around this, we pass a dummy no-op serializer.
+    final SerializerInstance ser = DummySerializerInstance.INSTANCE;
+
+    writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
+
+    int currentPartition = -1;
+    while (sortedRecords.hasNext()) {
+      sortedRecords.loadNext();
+      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
+      assert (partition >= currentPartition);
+      if (partition != currentPartition) {
+        // Switch to the new partition
+        if (currentPartition != -1) {
+          writer.commitAndClose();
+          spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
+        }
+        currentPartition = partition;
+        writer =
+          blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
+      }
+
+      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
+      final Object recordPage = memoryManager.getPage(recordPointer);
+      final long recordOffsetInPage = memoryManager.getOffsetInPage(recordPointer);
+      int dataRemaining = PlatformDependent.UNSAFE.getInt(recordPage, recordOffsetInPage);
+      long recordReadPosition = recordOffsetInPage + 4; // skip over record length
+      while (dataRemaining > 0) {
+        final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
+        PlatformDependent.copyMemory(
+          recordPage,
+          recordReadPosition,
+          writeBuffer,
+          PlatformDependent.BYTE_ARRAY_OFFSET,
+          toTransfer);
+        writer.write(writeBuffer, 0, toTransfer);
+        recordReadPosition += toTransfer;
+        dataRemaining -= toTransfer;
+      }
+      writer.recordWritten();
+    }
+
+    if (writer != null) {
+      writer.commitAndClose();
+      // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
+      // then the file might be empty. Note that it might be better to avoid calling
+      // writeSortedFile() in that case.
+      if (currentPartition != -1) {
+        spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
+        spills.add(spillInfo);
+      }
+    }
+
+    if (!isLastFile) {  // i.e. this is a spill file
+      // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
+      // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
+      // relies on its `recordWritten()` method being called in order to trigger periodic updates to
+      // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
+      // counter at a higher-level, then the in-progress metrics for records written and bytes
+      // written would get out of sync.
+      //
+      // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
+      // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
+      // metrics to the true write metrics here. The reason for performing this copying is so that
+      // we can avoid reporting spilled bytes as shuffle write bytes.
+      //
+      // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
+      // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
+      // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
+      writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
+      taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
+    }
+  }
+
+  /**
+   * Sort and spill the current records in response to memory pressure.
+   */
+  @VisibleForTesting
+  void spill() throws IOException {
+    logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
+      Thread.currentThread().getId(),
+      Utils.bytesToString(getMemoryUsage()),
+      spills.size(),
+      spills.size() > 1 ? " times" : " time");
+
+    writeSortedFile(false);
+    final long sorterMemoryUsage = sorter.getMemoryUsage();
+    sorter = null;
+    shuffleMemoryManager.release(sorterMemoryUsage);
+    final long spillSize = freeMemory();
+    taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
+
+    initializeForWriting();
+  }
+
+  private long getMemoryUsage() {
+    return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
+  }
+
+  private long freeMemory() {
+    long memoryFreed = 0;
+    for (MemoryBlock block : allocatedPages) {
+      memoryManager.freePage(block);
+      shuffleMemoryManager.release(block.size());
+      memoryFreed += block.size();
+    }
+    allocatedPages.clear();
+    currentPage = null;
+    currentPagePosition = -1;
+    freeSpaceInCurrentPage = 0;
+    return memoryFreed;
+  }
+
+  /**
+   * Force all memory and spill files to be deleted; called by shuffle error-handling code.
+   */
+  public void cleanupAfterError() {
+    freeMemory();
+    for (SpillInfo spill : spills) {
+      if (spill.file.exists() && !spill.file.delete()) {
+        logger.error("Unable to delete spill file {}", spill.file.getPath());
+      }
+    }
+    if (sorter != null) {
+      shuffleMemoryManager.release(sorter.getMemoryUsage());
+      sorter = null;
+    }
+  }
+
+  /**
+   * Checks whether there is enough space to insert a new record into the sorter.
+   *
+   * @param requiredSpace the required space in the data page, in bytes, including space for storing
+   *                      the record size.
+
+   * @return true if the record can be inserted without requiring more allocations, false otherwise.
+   */
+  private boolean haveSpaceForRecord(int requiredSpace) {
+    assert (requiredSpace > 0);
+    return (sorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
+  }
+
+  /**
+   * Allocates more memory in order to insert an additional record. This will request additional
+   * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
+   * obtained.
+   *
+   * @param requiredSpace the required space in the data page, in bytes, including space for storing
+   *                      the record size.
+   */
+  private void allocateSpaceForRecord(int requiredSpace) throws IOException {
+    if (!sorter.hasSpaceForAnotherRecord()) {
+      logger.debug("Attempting to expand sort pointer array");
+      final long oldPointerArrayMemoryUsage = sorter.getMemoryUsage();
+      final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
+      final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
+      if (memoryAcquired < memoryToGrowPointerArray) {
+        shuffleMemoryManager.release(memoryAcquired);
+        spill();
+      } else {
+        sorter.expandPointerArray();
+        shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
+      }
+    }
+    if (requiredSpace > freeSpaceInCurrentPage) {
+      logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
+        freeSpaceInCurrentPage);
+      // TODO: we should track metrics on the amount of space wasted when we roll over to a new page
+      // without using the free space at the end of the current page. We should also do this for
+      // BytesToBytesMap.
+      if (requiredSpace > PAGE_SIZE) {
+        throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
+          PAGE_SIZE + ")");
+      } else {
+        final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
+        if (memoryAcquired < PAGE_SIZE) {
+          shuffleMemoryManager.release(memoryAcquired);
+          spill();
+          final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
+          if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
+            shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
+            throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
+          }
+        }
+        currentPage = memoryManager.allocatePage(PAGE_SIZE);
+        currentPagePosition = currentPage.getBaseOffset();
+        freeSpaceInCurrentPage = PAGE_SIZE;
+        allocatedPages.add(currentPage);
+      }
+    }
+  }
+
+  /**
+   * Write a record to the shuffle sorter.
+   */
+  public void insertRecord(
+      Object recordBaseObject,
+      long recordBaseOffset,
+      int lengthInBytes,
+      int partitionId) throws IOException {
+    // Need 4 bytes to store the record length.
+    final int totalSpaceRequired = lengthInBytes + 4;
+    if (!haveSpaceForRecord(totalSpaceRequired)) {
+      allocateSpaceForRecord(totalSpaceRequired);
+    }
+
+    final long recordAddress =
+      memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
+    final Object dataPageBaseObject = currentPage.getBaseObject();
+    PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes);
+    currentPagePosition += 4;
+    freeSpaceInCurrentPage -= 4;
+    PlatformDependent.copyMemory(
+      recordBaseObject,
+      recordBaseOffset,
+      dataPageBaseObject,
+      currentPagePosition,
+      lengthInBytes);
+    currentPagePosition += lengthInBytes;
+    freeSpaceInCurrentPage -= lengthInBytes;
+    sorter.insertRecord(recordAddress, partitionId);
+  }
+
+  /**
+   * Close the sorter, causing any buffered data to be sorted and written out to disk.
+   *
+   * @return metadata for the spill files written by this sorter. If no records were ever inserted
+   *         into this sorter, then this will return an empty array.
+   * @throws IOException
+   */
+  public SpillInfo[] closeAndGetSpills() throws IOException {
+    try {
+      if (sorter != null) {
+        // Do not count the final file towards the spill count.
+        writeSortedFile(true);
+        freeMemory();
+      }
+      return spills.toArray(new SpillInfo[spills.size()]);
+    } catch (IOException e) {
+      cleanupAfterError();
+      throw e;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java
new file mode 100644
index 0000000..5bab501
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleInMemorySorter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.unsafe;
+
+import java.util.Comparator;
+
+import org.apache.spark.util.collection.Sorter;
+
+final class UnsafeShuffleInMemorySorter {
+
+  private final Sorter<PackedRecordPointer, long[]> sorter;
+  private static final class SortComparator implements Comparator<PackedRecordPointer> {
+    @Override
+    public int compare(PackedRecordPointer left, PackedRecordPointer right) {
+      return left.getPartitionId() - right.getPartitionId();
+    }
+  }
+  private static final SortComparator SORT_COMPARATOR = new SortComparator();
+
+  /**
+   * An array of record pointers and partition ids that have been encoded by
+   * {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating
+   * records.
+   */
+  private long[] pointerArray;
+
+  /**
+   * The position in the pointer array where new records can be inserted.
+   */
+  private int pointerArrayInsertPosition = 0;
+
+  public UnsafeShuffleInMemorySorter(int initialSize) {
+    assert (initialSize > 0);
+    this.pointerArray = new long[initialSize];
+    this.sorter = new Sorter<PackedRecordPointer, long[]>(UnsafeShuffleSortDataFormat.INSTANCE);
+  }
+
+  public void expandPointerArray() {
+    final long[] oldArray = pointerArray;
+    // Guard against overflow:
+    final int newLength = oldArray.length * 2 > 0 ? (oldArray.length * 2) : Integer.MAX_VALUE;
+    pointerArray = new long[newLength];
+    System.arraycopy(oldArray, 0, pointerArray, 0, oldArray.length);
+  }
+
+  public boolean hasSpaceForAnotherRecord() {
+    return pointerArrayInsertPosition + 1 < pointerArray.length;
+  }
+
+  public long getMemoryUsage() {
+    return pointerArray.length * 8L;
+  }
+
+  /**
+   * Inserts a record to be sorted.
+   *
+   * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
+   *                      certain pointer compression techniques used by the sorter, the sort can
+   *                      only operate on pointers that point to locations in the first
+   *                      {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
+   * @param partitionId the partition id, which must be less than or equal to
+   *                    {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
+   */
+  public void insertRecord(long recordPointer, int partitionId) {
+    if (!hasSpaceForAnotherRecord()) {
+      if (pointerArray.length == Integer.MAX_VALUE) {
+        throw new IllegalStateException("Sort pointer array has reached maximum size");
+      } else {
+        expandPointerArray();
+      }
+    }
+    pointerArray[pointerArrayInsertPosition] =
+        PackedRecordPointer.packPointer(recordPointer, partitionId);
+    pointerArrayInsertPosition++;
+  }
+
+  /**
+   * An iterator-like class that's used instead of Java's Iterator in order to facilitate inlining.
+   */
+  public static final class UnsafeShuffleSorterIterator {
+
+    private final long[] pointerArray;
+    private final int numRecords;
+    final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();
+    private int position = 0;
+
+    public UnsafeShuffleSorterIterator(int numRecords, long[] pointerArray) {
+      this.numRecords = numRecords;
+      this.pointerArray = pointerArray;
+    }
+
+    public boolean hasNext() {
+      return position < numRecords;
+    }
+
+    public void loadNext() {
+      packedRecordPointer.set(pointerArray[position]);
+      position++;
+    }
+  }
+
+  /**
+   * Return an iterator over record pointers in sorted order.
+   */
+  public UnsafeShuffleSorterIterator getSortedIterator() {
+    sorter.sort(pointerArray, 0, pointerArrayInsertPosition, SORT_COMPARATOR);
+    return new UnsafeShuffleSorterIterator(pointerArrayInsertPosition, pointerArray);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java
new file mode 100644
index 0000000..a66d74e
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSortDataFormat.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.unsafe;
+
+import org.apache.spark.util.collection.SortDataFormat;
+
+final class UnsafeShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, long[]> {
+
+  public static final UnsafeShuffleSortDataFormat INSTANCE = new UnsafeShuffleSortDataFormat();
+
+  private UnsafeShuffleSortDataFormat() { }
+
+  @Override
+  public PackedRecordPointer getKey(long[] data, int pos) {
+    // Since we re-use keys, this method shouldn't be called.
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PackedRecordPointer newKey() {
+    return new PackedRecordPointer();
+  }
+
+  @Override
+  public PackedRecordPointer getKey(long[] data, int pos, PackedRecordPointer reuse) {
+    reuse.set(data[pos]);
+    return reuse;
+  }
+
+  @Override
+  public void swap(long[] data, int pos0, int pos1) {
+    final long temp = data[pos0];
+    data[pos0] = data[pos1];
+    data[pos1] = temp;
+  }
+
+  @Override
+  public void copyElement(long[] src, int srcPos, long[] dst, int dstPos) {
+    dst[dstPos] = src[srcPos];
+  }
+
+  @Override
+  public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length) {
+    System.arraycopy(src, srcPos, dst, dstPos, length);
+  }
+
+  @Override
+  public long[] allocate(int length) {
+    return new long[length];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
new file mode 100644
index 0000000..ad7eb04
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.unsafe;
+
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.util.Iterator;
+import javax.annotation.Nullable;
+
+import scala.Option;
+import scala.Product2;
+import scala.collection.JavaConversions;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.*;
+import org.apache.spark.annotation.Private;
+import org.apache.spark.io.CompressionCodec;
+import org.apache.spark.io.CompressionCodec$;
+import org.apache.spark.io.LZFCompressionCodec;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.scheduler.MapStatus$;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.IndexShuffleBlockResolver;
+import org.apache.spark.shuffle.ShuffleMemoryManager;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.storage.BlockManager;
+import org.apache.spark.storage.TimeTrackingOutputStream;
+import org.apache.spark.unsafe.PlatformDependent;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
+
+@Private
+public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
+
+  private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleWriter.class);
+
+  private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();
+
+  @VisibleForTesting
+  static final int INITIAL_SORT_BUFFER_SIZE = 4096;
+
+  private final BlockManager blockManager;
+  private final IndexShuffleBlockResolver shuffleBlockResolver;
+  private final TaskMemoryManager memoryManager;
+  private final ShuffleMemoryManager shuffleMemoryManager;
+  private final SerializerInstance serializer;
+  private final Partitioner partitioner;
+  private final ShuffleWriteMetrics writeMetrics;
+  private final int shuffleId;
+  private final int mapId;
+  private final TaskContext taskContext;
+  private final SparkConf sparkConf;
+  private final boolean transferToEnabled;
+
+  private MapStatus mapStatus = null;
+  private UnsafeShuffleExternalSorter sorter = null;
+
+  /** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
+  private static final class MyByteArrayOutputStream extends ByteArrayOutputStream {
+    public MyByteArrayOutputStream(int size) { super(size); }
+    public byte[] getBuf() { return buf; }
+  }
+
+  private MyByteArrayOutputStream serBuffer;
+  private SerializationStream serOutputStream;
+
+  /**
+   * Are we in the process of stopping? Because map tasks can call stop() with success = true
+   * and then call stop() with success = false if they get an exception, we want to make sure
+   * we don't try deleting files, etc twice.
+   */
+  private boolean stopping = false;
+
+  public UnsafeShuffleWriter(
+      BlockManager blockManager,
+      IndexShuffleBlockResolver shuffleBlockResolver,
+      TaskMemoryManager memoryManager,
+      ShuffleMemoryManager shuffleMemoryManager,
+      UnsafeShuffleHandle<K, V> handle,
+      int mapId,
+      TaskContext taskContext,
+      SparkConf sparkConf) throws IOException {
+    final int numPartitions = handle.dependency().partitioner().numPartitions();
+    if (numPartitions > UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS()) {
+      throw new IllegalArgumentException(
+        "UnsafeShuffleWriter can only be used for shuffles with at most " +
+          UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS() + " reduce partitions");
+    }
+    this.blockManager = blockManager;
+    this.shuffleBlockResolver = shuffleBlockResolver;
+    this.memoryManager = memoryManager;
+    this.shuffleMemoryManager = shuffleMemoryManager;
+    this.mapId = mapId;
+    final ShuffleDependency<K, V, V> dep = handle.dependency();
+    this.shuffleId = dep.shuffleId();
+    this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
+    this.partitioner = dep.partitioner();
+    this.writeMetrics = new ShuffleWriteMetrics();
+    taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+    this.taskContext = taskContext;
+    this.sparkConf = sparkConf;
+    this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
+    open();
+  }
+
+  /**
+   * This convenience method should only be called in test code.
+   */
+  @VisibleForTesting
+  public void write(Iterator<Product2<K, V>> records) throws IOException {
+    write(JavaConversions.asScalaIterator(records));
+  }
+
+  @Override
+  public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
+    boolean success = false;
+    try {
+      while (records.hasNext()) {
+        insertRecordIntoSorter(records.next());
+      }
+      closeAndWriteOutput();
+      success = true;
+    } finally {
+      if (!success) {
+        sorter.cleanupAfterError();
+      }
+    }
+  }
+
+  private void open() throws IOException {
+    assert (sorter == null);
+    sorter = new UnsafeShuffleExternalSorter(
+      memoryManager,
+      shuffleMemoryManager,
+      blockManager,
+      taskContext,
+      INITIAL_SORT_BUFFER_SIZE,
+      partitioner.numPartitions(),
+      sparkConf,
+      writeMetrics);
+    serBuffer = new MyByteArrayOutputStream(1024 * 1024);
+    serOutputStream = serializer.serializeStream(serBuffer);
+  }
+
+  @VisibleForTesting
+  void closeAndWriteOutput() throws IOException {
+    serBuffer = null;
+    serOutputStream = null;
+    final SpillInfo[] spills = sorter.closeAndGetSpills();
+    sorter = null;
+    final long[] partitionLengths;
+    try {
+      partitionLengths = mergeSpills(spills);
+    } finally {
+      for (SpillInfo spill : spills) {
+        if (spill.file.exists() && ! spill.file.delete()) {
+          logger.error("Error while deleting spill file {}", spill.file.getPath());
+        }
+      }
+    }
+    shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
+  }
+
+  @VisibleForTesting
+  void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
+    final K key = record._1();
+    final int partitionId = partitioner.getPartition(key);
+    serBuffer.reset();
+    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
+    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
+    serOutputStream.flush();
+
+    final int serializedRecordSize = serBuffer.size();
+    assert (serializedRecordSize > 0);
+
+    sorter.insertRecord(
+      serBuffer.getBuf(), PlatformDependent.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
+  }
+
+  @VisibleForTesting
+  void forceSorterToSpill() throws IOException {
+    assert (sorter != null);
+    sorter.spill();
+  }
+
+  /**
+   * Merge zero or more spill files together, choosing the fastest merging strategy based on the
+   * number of spills and the IO compression codec.
+   *
+   * @return the partition lengths in the merged file.
+   */
+  private long[] mergeSpills(SpillInfo[] spills) throws IOException {
+    final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+    final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
+    final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
+    final boolean fastMergeEnabled =
+      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
+    final boolean fastMergeIsSupported =
+      !compressionEnabled || compressionCodec instanceof LZFCompressionCodec;
+    try {
+      if (spills.length == 0) {
+        new FileOutputStream(outputFile).close(); // Create an empty file
+        return new long[partitioner.numPartitions()];
+      } else if (spills.length == 1) {
+        // Here, we don't need to perform any metrics updates because the bytes written to this
+        // output file would have already been counted as shuffle bytes written.
+        Files.move(spills[0].file, outputFile);
+        return spills[0].partitionLengths;
+      } else {
+        final long[] partitionLengths;
+        // There are multiple spills to merge, so none of these spill files' lengths were counted
+        // towards our shuffle write count or shuffle write time. If we use the slow merge path,
+        // then the final output file's size won't necessarily be equal to the sum of the spill
+        // files' sizes. To guard against this case, we look at the output file's actual size when
+        // computing shuffle bytes written.
+        //
+        // We allow the individual merge methods to report their own IO times since different merge
+        // strategies use different IO techniques.  We count IO during merge towards the shuffle
+        // shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
+        // branch in ExternalSorter.
+        if (fastMergeEnabled && fastMergeIsSupported) {
+          // Compression is disabled or we are using an IO compression codec that supports
+          // decompression of concatenated compressed streams, so we can perform a fast spill merge
+          // that doesn't need to interpret the spilled bytes.
+          if (transferToEnabled) {
+            logger.debug("Using transferTo-based fast merge");
+            partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
+          } else {
+            logger.debug("Using fileStream-based fast merge");
+            partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
+          }
+        } else {
+          logger.debug("Using slow merge");
+          partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
+        }
+        // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
+        // in-memory records, we write out the in-memory records to a file but do not count that
+        // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
+        // to be counted as shuffle write, but this will lead to double-counting of the final
+        // SpillInfo's bytes.
+        writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
+        writeMetrics.incShuffleBytesWritten(outputFile.length());
+        return partitionLengths;
+      }
+    } catch (IOException e) {
+      if (outputFile.exists() && !outputFile.delete()) {
+        logger.error("Unable to delete output file {}", outputFile.getPath());
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Merges spill files using Java FileStreams. This code path is slower than the NIO-based merge,
+   * {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], File)}, so it's only used in
+   * cases where the IO compression codec does not support concatenation of compressed data, or in
+   * cases where users have explicitly disabled use of {@code transferTo} in order to work around
+   * kernel bugs.
+   *
+   * @param spills the spills to merge.
+   * @param outputFile the file to write the merged data to.
+   * @param compressionCodec the IO compression codec, or null if shuffle compression is disabled.
+   * @return the partition lengths in the merged file.
+   */
+  private long[] mergeSpillsWithFileStream(
+      SpillInfo[] spills,
+      File outputFile,
+      @Nullable CompressionCodec compressionCodec) throws IOException {
+    assert (spills.length >= 2);
+    final int numPartitions = partitioner.numPartitions();
+    final long[] partitionLengths = new long[numPartitions];
+    final InputStream[] spillInputStreams = new FileInputStream[spills.length];
+    OutputStream mergedFileOutputStream = null;
+
+    boolean threwException = true;
+    try {
+      for (int i = 0; i < spills.length; i++) {
+        spillInputStreams[i] = new FileInputStream(spills[i].file);
+      }
+      for (int partition = 0; partition < numPartitions; partition++) {
+        final long initialFileLength = outputFile.length();
+        mergedFileOutputStream =
+          new TimeTrackingOutputStream(writeMetrics, new FileOutputStream(outputFile, true));
+        if (compressionCodec != null) {
+          mergedFileOutputStream = compressionCodec.compressedOutputStream(mergedFileOutputStream);
+        }
+
+        for (int i = 0; i < spills.length; i++) {
+          final long partitionLengthInSpill = spills[i].partitionLengths[partition];
+          if (partitionLengthInSpill > 0) {
+            InputStream partitionInputStream =
+              new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill);
+            if (compressionCodec != null) {
+              partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
+            }
+            ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
+          }
+        }
+        mergedFileOutputStream.flush();
+        mergedFileOutputStream.close();
+        partitionLengths[partition] = (outputFile.length() - initialFileLength);
+      }
+      threwException = false;
+    } finally {
+      // To avoid masking exceptions that caused us to prematurely enter the finally block, only
+      // throw exceptions during cleanup if threwException == false.
+      for (InputStream stream : spillInputStreams) {
+        Closeables.close(stream, threwException);
+      }
+      Closeables.close(mergedFileOutputStream, threwException);
+    }
+    return partitionLengths;
+  }
+
+  /**
+   * Merges spill files by using NIO's transferTo to concatenate spill partitions' bytes.
+   * This is only safe when the IO compression codec and serializer support concatenation of
+   * serialized streams.
+   *
+   * @return the partition lengths in the merged file.
+   */
+  private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException {
+    assert (spills.length >= 2);
+    final int numPartitions = partitioner.numPartitions();
+    final long[] partitionLengths = new long[numPartitions];
+    final FileChannel[] spillInputChannels = new FileChannel[spills.length];
+    final long[] spillInputChannelPositions = new long[spills.length];
+    FileChannel mergedFileOutputChannel = null;
+
+    boolean threwException = true;
+    try {
+      for (int i = 0; i < spills.length; i++) {
+        spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
+      }
+      // This file needs to opened in append mode in order to work around a Linux kernel bug that
+      // affects transferTo; see SPARK-3948 for more details.
+      mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
+
+      long bytesWrittenToMergedFile = 0;
+      for (int partition = 0; partition < numPartitions; partition++) {
+        for (int i = 0; i < spills.length; i++) {
+          final long partitionLengthInSpill = spills[i].partitionLengths[partition];
+          long bytesToTransfer = partitionLengthInSpill;
+          final FileChannel spillInputChannel = spillInputChannels[i];
+          final long writeStartTime = System.nanoTime();
+          while (bytesToTransfer > 0) {
+            final long actualBytesTransferred = spillInputChannel.transferTo(
+              spillInputChannelPositions[i],
+              bytesToTransfer,
+              mergedFileOutputChannel);
+            spillInputChannelPositions[i] += actualBytesTransferred;
+            bytesToTransfer -= actualBytesTransferred;
+          }
+          writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+          bytesWrittenToMergedFile += partitionLengthInSpill;
+          partitionLengths[partition] += partitionLengthInSpill;
+        }
+      }
+      // Check the position after transferTo loop to see if it is in the right position and raise an
+      // exception if it is incorrect. The position will not be increased to the expected length
+      // after calling transferTo in kernel version 2.6.32. This issue is described at
+      // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948.
+      if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) {
+        throw new IOException(
+          "Current position " + mergedFileOutputChannel.position() + " does not equal expected " +
+            "position " + bytesWrittenToMergedFile + " after transferTo. Please check your kernel" +
+            " version to see if it is 2.6.32, as there is a kernel bug which will lead to " +
+            "unexpected behavior when using transferTo. You can set spark.file.transferTo=false " +
+            "to disable this NIO feature."
+        );
+      }
+      threwException = false;
+    } finally {
+      // To avoid masking exceptions that caused us to prematurely enter the finally block, only
+      // throw exceptions during cleanup if threwException == false.
+      for (int i = 0; i < spills.length; i++) {
+        assert(spillInputChannelPositions[i] == spills[i].file.length());
+        Closeables.close(spillInputChannels[i], threwException);
+      }
+      Closeables.close(mergedFileOutputChannel, threwException);
+    }
+    return partitionLengths;
+  }
+
+  @Override
+  public Option<MapStatus> stop(boolean success) {
+    try {
+      if (stopping) {
+        return Option.apply(null);
+      } else {
+        stopping = true;
+        if (success) {
+          if (mapStatus == null) {
+            throw new IllegalStateException("Cannot call stop(true) without having called write()");
+          }
+          return Option.apply(mapStatus);
+        } else {
+          // The map task failed, so delete our output data.
+          shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
+          return Option.apply(null);
+        }
+      }
+    } finally {
+      if (sorter != null) {
+        // If sorter is non-null, then this implies that we called stop() in response to an error,
+        // so we need to clean up memory and spill files created by the sorter
+        sorter.cleanupAfterError();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
new file mode 100644
index 0000000..dc2aa30
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+
+/**
+ * Intercepts write calls and tracks total time spent writing in order to update shuffle write
+ * metrics. Not thread safe.
+ */
+@Private
+public final class TimeTrackingOutputStream extends OutputStream {
+
+  private final ShuffleWriteMetrics writeMetrics;
+  private final OutputStream outputStream;
+
+  public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream outputStream) {
+    this.writeMetrics = writeMetrics;
+    this.outputStream = outputStream;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    final long startTime = System.nanoTime();
+    outputStream.write(b);
+    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    final long startTime = System.nanoTime();
+    outputStream.write(b);
+    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    final long startTime = System.nanoTime();
+    outputStream.write(b, off, len);
+    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    final long startTime = System.nanoTime();
+    outputStream.flush();
+    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public void close() throws IOException {
+    final long startTime = System.nanoTime();
+    outputStream.close();
+    writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 0c4d28f..a5d831c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -313,7 +313,8 @@ object SparkEnv extends Logging {
     // Let the user specify short names for shuffle managers
     val shortShuffleMgrNames = Map(
       "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
-      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
+      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
+      "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
     val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index dfbde7c..698d138 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -121,6 +121,8 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
   private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
   private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)
 
+  protected def this() = this(new SparkConf())  // For deserialization only
+
   override def newInstance(): SerializerInstance = {
     val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
     new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 6ad427b..6c3b308 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -76,7 +76,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
   private val consolidateShuffleFiles =
     conf.getBoolean("spark.shuffle.consolidateFiles", false)
 
-  // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided 
+  // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
   private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
index f6e6fe5..4cc4ef5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.shuffle
 
+import java.io.IOException
+
 import org.apache.spark.scheduler.MapStatus
 
 /**
  * Obtained inside a map task to write out records to the shuffle system.
  */
-private[spark] trait ShuffleWriter[K, V] {
+private[spark] abstract class ShuffleWriter[K, V] {
   /** Write a sequence of records to this task's output */
-  def write(records: Iterator[_ <: Product2[K, V]]): Unit
+  @throws[IOException]
+  def write(records: Iterator[Product2[K, V]]): Unit
 
   /** Close this writer, passing along whether the map completed */
   def stop(success: Boolean): Option[MapStatus]

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 897f0a5..eb87cee 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -49,7 +49,7 @@ private[spark] class HashShuffleWriter[K, V](
     writeMetrics)
 
   /** Write a bunch of records to this task's output */
-  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
+  override def write(records: Iterator[Product2[K, V]]): Unit = {
     val iter = if (dep.aggregator.isDefined) {
       if (dep.mapSideCombine) {
         dep.aggregator.get.combineValuesByKey(records, context)

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 1584294..d7fab35 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -72,7 +72,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
     true
   }
 
-  override def shuffleBlockResolver: IndexShuffleBlockResolver = {
+  override val shuffleBlockResolver: IndexShuffleBlockResolver = {
     indexShuffleBlockResolver
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/73bed408/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index add2656..c9dd6bf 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -48,7 +48,7 @@ private[spark] class SortShuffleWriter[K, V, C](
   context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)
 
   /** Write a bunch of records to this task's output */
-  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
+  override def write(records: Iterator[Product2[K, V]]): Unit = {
     if (dep.mapSideCombine) {
       require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
       sorter = new ExternalSorter[K, V, C](


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message