spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [3/3] git commit: [SPARK-3796] Create external service which can serve shuffle files
Date Sat, 01 Nov 2014 21:37:51 GMT
[SPARK-3796] Create external service which can serve shuffle files

This patch introduces the tooling necessary to construct an external shuffle service which is independent of Spark executors, and then use this service inside Spark. An example (just for the sake of this PR) of the service creation can be found in Worker, and the service itself is used by plugging in the StandaloneShuffleClient as Spark's ShuffleClient (setup in BlockManager).

This PR continues the work from #2753, which extracted out the transport layer of Spark's block transfer into an independent package within Spark. A new package was created which contains the Spark business logic necessary to retrieve the actual shuffle data, which is completely independent of the transport layer introduced in the previous patch. Similar to the transport layer, this package must not depend on Spark as we anticipate plugging this service as a lightweight process within, say, the YARN NodeManager, and do not wish to include Spark's dependencies (including Scala itself).

There are several outstanding tasks which must be complete before this PR can be merged:
- [x] Complete unit testing of network/shuffle package.
- [x] Performance and correctness testing on a real cluster.
- [x] Remove example service instantiation from Worker.scala.

There are even more shortcomings of this PR which should be addressed in followup patches:
- Don't use Java serializer for RPC layer! It is not cross-version compatible.
- Handle shuffle file cleanup for dead executors once the application terminates or the ContextCleaner triggers.
- Documentation of the feature in the Spark docs.
- Improve behavior if the shuffle service itself goes down (right now we don't blacklist it, and new executors cannot spawn on that machine).
- SSL and SASL integration
- Nice to have: Handle shuffle file consolidation (this would requires changes to Spark's implementation).

Author: Aaron Davidson <aaron@databricks.com>

Closes #3001 from aarondav/shuffle-service and squashes the following commits:

4d1f8c1 [Aaron Davidson] Remove changes to Worker
705748f [Aaron Davidson] Rename Standalone* to External*
fd3928b [Aaron Davidson] Do not unregister executor outputs unduly
9883918 [Aaron Davidson] Make suggested build changes
3d62679 [Aaron Davidson] Add Spark integration test
7fe51d5 [Aaron Davidson] Fix SBT integration
56caa50 [Aaron Davidson] Address comments
c8d1ac3 [Aaron Davidson] Add unit tests
2f70c0c [Aaron Davidson] Fix unit tests
5483e96 [Aaron Davidson] Fix unit tests
46a70bf [Aaron Davidson] Whoops, bracket
5ea4df6 [Aaron Davidson] [SPARK-3796] Create external service which can serve shuffle files


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f55218ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f55218ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f55218ae

Branch: refs/heads/master
Commit: f55218aeb1e9d638df6229b36a59a15ce5363482
Parents: 1d4f355
Author: Aaron Davidson <aaron@databricks.com>
Authored: Sat Nov 1 14:37:45 2014 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sat Nov 1 14:37:45 2014 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |   5 +
 .../org/apache/spark/MapOutputTracker.scala     |   4 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |   2 +-
 .../org/apache/spark/executor/Executor.scala    |   2 +-
 .../spark/network/BlockFetchingListener.scala   |  42 ---
 .../spark/network/BlockTransferService.scala    |  55 ++--
 .../spark/network/netty/NettyBlockFetcher.scala |  95 ------
 .../network/netty/NettyBlockRpcServer.scala     |  26 +-
 .../netty/NettyBlockTransferService.scala       |  29 +-
 .../network/netty/SparkTransportConf.scala      |  32 ++
 .../network/nio/NioBlockTransferService.scala   |   8 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  41 ++-
 .../org/apache/spark/scheduler/Stage.scala      |  10 +
 .../apache/spark/scheduler/TaskSetManager.scala |   5 +-
 .../spark/shuffle/FileShuffleBlockManager.scala |   3 +-
 .../shuffle/IndexShuffleBlockManager.scala      |   2 +
 .../shuffle/hash/BlockStoreShuffleFetcher.scala |   2 +-
 .../spark/shuffle/hash/HashShuffleWriter.scala  |   2 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala  |   2 +-
 .../org/apache/spark/storage/BlockId.scala      |   2 +
 .../org/apache/spark/storage/BlockManager.scala |  71 ++++-
 .../apache/spark/storage/DiskBlockManager.scala |  25 +-
 .../storage/ShuffleBlockFetcherIterator.scala   |  14 +-
 .../scala/org/apache/spark/util/Utils.scala     |   2 +
 .../org/apache/spark/DistributedSuite.scala     |   3 +-
 .../spark/ExternalShuffleServiceSuite.scala     |  76 +++++
 .../org/apache/spark/HashShuffleSuite.scala     |   6 +-
 .../org/apache/spark/ShuffleNettySuite.scala    |   6 +-
 .../scala/org/apache/spark/ShuffleSuite.scala   |  34 +--
 .../org/apache/spark/SortShuffleSuite.scala     |   6 +-
 .../spark/metrics/MetricsSystemSuite.scala      |   4 +-
 .../ShuffleBlockFetcherIteratorSuite.scala      |  25 +-
 network/common/pom.xml                          |  20 +-
 .../apache/spark/network/TransportContext.java  |  14 +-
 .../spark/network/client/TransportClient.java   |  32 +-
 .../network/client/TransportClientFactory.java  |  17 +-
 .../network/server/DefaultStreamManager.java    | 104 -------
 .../spark/network/server/NoOpRpcHandler.java    |  38 +++
 .../network/server/OneForOneStreamManager.java  | 104 +++++++
 .../apache/spark/network/server/RpcHandler.java |   6 +
 .../network/server/TransportRequestHandler.java |   9 +-
 .../spark/network/server/TransportServer.java   |   8 +-
 .../apache/spark/network/util/JavaUtils.java    |  38 +++
 .../util/SystemPropertyConfigProvider.java      |  34 +++
 .../spark/network/util/TransportConf.java       |   3 -
 .../network/ChunkFetchIntegrationSuite.java     |  16 +-
 .../apache/spark/network/NoOpRpcHandler.java    |  28 --
 .../spark/network/RpcIntegrationSuite.java      |   9 +-
 .../network/SystemPropertyConfigProvider.java   |  34 ---
 .../network/TransportClientFactorySuite.java    |   7 +-
 network/shuffle/pom.xml                         |  96 ++++++
 .../network/shuffle/BlockFetchingListener.java  |  36 +++
 .../network/shuffle/ExecutorShuffleInfo.java    |  64 ++++
 .../shuffle/ExternalShuffleBlockHandler.java    | 102 +++++++
 .../shuffle/ExternalShuffleBlockManager.java    | 154 ++++++++++
 .../network/shuffle/ExternalShuffleClient.java  |  88 ++++++
 .../shuffle/ExternalShuffleMessages.java        | 106 +++++++
 .../network/shuffle/OneForOneBlockFetcher.java  | 121 ++++++++
 .../spark/network/shuffle/ShuffleClient.java    |  35 +++
 .../network/shuffle/ShuffleStreamHandle.java    |  60 ++++
 .../ExternalShuffleBlockHandlerSuite.java       | 123 ++++++++
 .../ExternalShuffleBlockManagerSuite.java       | 125 ++++++++
 .../ExternalShuffleIntegrationSuite.java        | 291 +++++++++++++++++++
 .../shuffle/OneForOneBlockFetcherSuite.java     | 167 +++++++++++
 .../network/shuffle/ShuffleMessagesSuite.java   |  51 ++++
 .../network/shuffle/TestShuffleDataContext.java | 107 +++++++
 pom.xml                                         |   1 +
 project/SparkBuild.scala                        |  11 +-
 69 files changed, 2403 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 6963ce4..41296e0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -50,6 +50,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-network-shuffle_2.10</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>net.java.dev.jets3t</groupId>
       <artifactId>jets3t</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 4cb0bd4..7d96962 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -178,6 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
           return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
         }
       } else {
+        logError("Missing all output locations for shuffle " + shuffleId)
         throw new MetadataFetchFailedException(
           shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
       }
@@ -348,7 +349,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     new ConcurrentHashMap[Int, Array[MapStatus]]
 }
 
-private[spark] object MapOutputTracker {
+private[spark] object MapOutputTracker extends Logging {
 
   // Serialize an array of map output locations into an efficient byte format so that we can send
   // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -381,6 +382,7 @@ private[spark] object MapOutputTracker {
     statuses.map {
       status =>
         if (status == null) {
+          logError("Missing an output location for shuffle " + shuffleId)
           throw new MetadataFetchFailedException(
             shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 16c5d66..e2f13ac 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -32,7 +32,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.network.BlockTransferService
-import org.apache.spark.network.netty.{NettyBlockTransferService}
+import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.network.nio.NioBlockTransferService
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.Serializer

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c4a8ec2..f1f66d0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -186,11 +186,11 @@ private[spark] class Worker(
   private def retryConnectToMaster() {
     Utils.tryOrExit {
       connectionAttemptCount += 1
-      logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount")
       if (registered) {
         registrationRetryTimer.foreach(_.cancel())
         registrationRetryTimer = None
       } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
+        logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
         tryRegisterAllMasters()
         if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
           registrationRetryTimer.foreach(_.cancel())

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2889f59..c78e0ff 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -78,7 +78,7 @@ private[spark] class Executor(
   val executorSource = new ExecutorSource(this, executorId)
 
   // Initialize Spark environment (using system properties read above)
-  conf.set("spark.executor.id", "executor." + executorId)
+  conf.set("spark.executor.id", executorId)
   private val env = {
     if (!isLocal) {
       val port = conf.getInt("spark.executor.port", 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala b/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
deleted file mode 100644
index 645793f..0000000
--- a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-import java.util.EventListener
-
-import org.apache.spark.network.buffer.ManagedBuffer
-
-
-/**
- * Listener callback interface for [[BlockTransferService.fetchBlocks]].
- */
-private[spark]
-trait BlockFetchingListener extends EventListener {
-
-  /**
-   * Called once per successfully fetched block. After this call returns, data will be released
-   * automatically. If the data will be passed to another thread, the receiver should retain()
-   * and release() the buffer on their own, or copy the data to a new buffer.
-   */
-  def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit
-
-  /**
-   * Called at least once per block upon failures.
-   */
-  def onBlockFetchFailure(blockId: String, exception: Throwable): Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index b083f46..210a581 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -20,16 +20,16 @@ package org.apache.spark.network
 import java.io.Closeable
 import java.nio.ByteBuffer
 
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Promise, Await, Future}
 import scala.concurrent.duration.Duration
 
 import org.apache.spark.Logging
 import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
-import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.util.Utils
+import org.apache.spark.network.shuffle.{ShuffleClient, BlockFetchingListener}
+import org.apache.spark.storage.{BlockManagerId, BlockId, StorageLevel}
 
 private[spark]
-abstract class BlockTransferService extends Closeable with Logging {
+abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
 
   /**
    * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
@@ -60,10 +60,11 @@ abstract class BlockTransferService extends Closeable with Logging {
    * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
    * the data of a block is fetched, rather than waiting for all blocks to be fetched.
    */
-  def fetchBlocks(
-      hostName: String,
+  override def fetchBlocks(
+      host: String,
       port: Int,
-      blockIds: Seq[String],
+      execId: String,
+      blockIds: Array[String],
       listener: BlockFetchingListener): Unit
 
   /**
@@ -81,43 +82,23 @@ abstract class BlockTransferService extends Closeable with Logging {
    *
    * It is also only available after [[init]] is invoked.
    */
-  def fetchBlockSync(hostName: String, port: Int, blockId: String): ManagedBuffer = {
+  def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
     // A monitor for the thread to wait on.
-    val lock = new Object
-    @volatile var result: Either[ManagedBuffer, Throwable] = null
-    fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
-      override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
-        lock.synchronized {
-          result = Right(exception)
-          lock.notify()
+    val result = Promise[ManagedBuffer]()
+    fetchBlocks(host, port, execId, Array(blockId),
+      new BlockFetchingListener {
+        override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
+          result.failure(exception)
         }
-      }
-      override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
-        lock.synchronized {
+        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
           val ret = ByteBuffer.allocate(data.size.toInt)
           ret.put(data.nioByteBuffer())
           ret.flip()
-          result = Left(new NioManagedBuffer(ret))
-          lock.notify()
+          result.success(new NioManagedBuffer(ret))
         }
-      }
-    })
+      })
 
-    // Sleep until result is no longer null
-    lock.synchronized {
-      while (result == null) {
-        try {
-          lock.wait()
-        } catch {
-          case e: InterruptedException =>
-        }
-      }
-    }
-
-    result match {
-      case Left(data) => data
-      case Right(e) => throw e
-    }
+    Await.result(result.future, Duration.Inf)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala
deleted file mode 100644
index 8c5ffd8..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import java.nio.ByteBuffer
-import java.util
-
-import org.apache.spark.{SparkConf, Logging}
-import org.apache.spark.network.BlockFetchingListener
-import org.apache.spark.network.netty.NettyMessages._
-import org.apache.spark.serializer.{JavaSerializer, Serializer}
-import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.network.client.{RpcResponseCallback, ChunkReceivedCallback, TransportClient}
-import org.apache.spark.storage.BlockId
-import org.apache.spark.util.Utils
-
-/**
- * Responsible for holding the state for a request for a single set of blocks. This assumes that
- * the chunks will be returned in the same order as requested, and that there will be exactly
- * one chunk per block.
- *
- * Upon receipt of any block, the listener will be called back. Upon failure part way through,
- * the listener will receive a failure callback for each outstanding block.
- */
-class NettyBlockFetcher(
-    serializer: Serializer,
-    client: TransportClient,
-    blockIds: Seq[String],
-    listener: BlockFetchingListener)
-  extends Logging {
-
-  require(blockIds.nonEmpty)
-
-  private val ser = serializer.newInstance()
-
-  private var streamHandle: ShuffleStreamHandle = _
-
-  private val chunkCallback = new ChunkReceivedCallback {
-    // On receipt of a chunk, pass it upwards as a block.
-    def onSuccess(chunkIndex: Int, buffer: ManagedBuffer): Unit = Utils.logUncaughtExceptions {
-      listener.onBlockFetchSuccess(blockIds(chunkIndex), buffer)
-    }
-
-    // On receipt of a failure, fail every block from chunkIndex onwards.
-    def onFailure(chunkIndex: Int, e: Throwable): Unit = {
-      blockIds.drop(chunkIndex).foreach { blockId =>
-        listener.onBlockFetchFailure(blockId, e);
-      }
-    }
-  }
-
-  /** Begins the fetching process, calling the listener with every block fetched. */
-  def start(): Unit = {
-    // Send the RPC to open the given set of blocks. This will return a ShuffleStreamHandle.
-    client.sendRpc(ser.serialize(OpenBlocks(blockIds.map(BlockId.apply))).array(),
-      new RpcResponseCallback {
-        override def onSuccess(response: Array[Byte]): Unit = {
-          try {
-            streamHandle = ser.deserialize[ShuffleStreamHandle](ByteBuffer.wrap(response))
-            logTrace(s"Successfully opened block set: $streamHandle! Preparing to fetch chunks.")
-
-            // Immediately request all chunks -- we expect that the total size of the request is
-            // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
-            for (i <- 0 until streamHandle.numChunks) {
-              client.fetchChunk(streamHandle.streamId, i, chunkCallback)
-            }
-          } catch {
-            case e: Exception =>
-              logError("Failed while starting block fetches", e)
-              blockIds.foreach(blockId => Utils.tryLog(listener.onBlockFetchFailure(blockId, e)))
-          }
-        }
-
-        override def onFailure(e: Throwable): Unit = {
-          logError("Failed while starting block fetches", e)
-          blockIds.foreach(blockId => Utils.tryLog(listener.onBlockFetchFailure(blockId, e)))
-        }
-      })
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 02c657e..1950e7b 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -19,39 +19,41 @@ package org.apache.spark.network.netty
 
 import java.nio.ByteBuffer
 
+import scala.collection.JavaConversions._
+
 import org.apache.spark.Logging
 import org.apache.spark.network.BlockDataManager
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager}
+import org.apache.spark.network.shuffle.ShuffleStreamHandle
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.client.{TransportClient, RpcResponseCallback}
-import org.apache.spark.network.server.{DefaultStreamManager, RpcHandler}
-import org.apache.spark.storage.{StorageLevel, BlockId}
-
-import scala.collection.JavaConversions._
+import org.apache.spark.storage.{BlockId, StorageLevel}
 
 object NettyMessages {
-
   /** Request to read a set of blocks. Returns [[ShuffleStreamHandle]] to identify the stream. */
   case class OpenBlocks(blockIds: Seq[BlockId])
 
   /** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
   case class UploadBlock(blockId: BlockId, blockData: Array[Byte], level: StorageLevel)
-
-  /** Identifier for a fixed number of chunks to read from a stream created by [[OpenBlocks]]. */
-  case class ShuffleStreamHandle(streamId: Long, numChunks: Int)
 }
 
 /**
  * Serves requests to open blocks by simply registering one chunk per block requested.
+ * Handles opening and uploading arbitrary BlockManager blocks.
+ *
+ * Opened blocks are registered with the "one-for-one" strategy, meaning each Transport-layer Chunk
+ * is equivalent to one Spark-level shuffle block.
  */
 class NettyBlockRpcServer(
     serializer: Serializer,
-    streamManager: DefaultStreamManager,
     blockManager: BlockDataManager)
   extends RpcHandler with Logging {
 
   import NettyMessages._
 
+  private val streamManager = new OneForOneStreamManager()
+
   override def receive(
       client: TransportClient,
       messageBytes: Array[Byte],
@@ -73,4 +75,6 @@ class NettyBlockRpcServer(
         responseContext.onSuccess(new Array[Byte](0))
     }
   }
+
+  override def getStreamManager(): StreamManager = streamManager
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 38a3e94..ec3000e 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -17,15 +17,15 @@
 
 package org.apache.spark.network.netty
 
-import scala.concurrent.{Promise, Future}
+import scala.concurrent.{Future, Promise}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.network.client.{RpcResponseCallback, TransportClient, TransportClientFactory}
-import org.apache.spark.network.netty.NettyMessages.UploadBlock
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClientFactory}
+import org.apache.spark.network.netty.NettyMessages.{OpenBlocks, UploadBlock}
 import org.apache.spark.network.server._
-import org.apache.spark.network.util.{ConfigProvider, TransportConf}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher}
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.util.Utils
@@ -37,30 +37,29 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
   // TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
   val serializer = new JavaSerializer(conf)
 
-  // Create a TransportConfig using SparkConf.
-  private[this] val transportConf = new TransportConf(
-    new ConfigProvider { override def get(name: String) = conf.get(name) })
-
   private[this] var transportContext: TransportContext = _
   private[this] var server: TransportServer = _
   private[this] var clientFactory: TransportClientFactory = _
 
   override def init(blockDataManager: BlockDataManager): Unit = {
-    val streamManager = new DefaultStreamManager
-    val rpcHandler = new NettyBlockRpcServer(serializer, streamManager, blockDataManager)
-    transportContext = new TransportContext(transportConf, streamManager, rpcHandler)
+    val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
+    transportContext = new TransportContext(SparkTransportConf.fromSparkConf(conf), rpcHandler)
     clientFactory = transportContext.createClientFactory()
     server = transportContext.createServer()
+    logInfo("Server created on " + server.getPort)
   }
 
   override def fetchBlocks(
-      hostname: String,
+      host: String,
       port: Int,
-      blockIds: Seq[String],
+      execId: String,
+      blockIds: Array[String],
       listener: BlockFetchingListener): Unit = {
+    logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
     try {
-      val client = clientFactory.createClient(hostname, port)
-      new NettyBlockFetcher(serializer, client, blockIds, listener).start()
+      val client = clientFactory.createClient(host, port)
+      new OneForOneBlockFetcher(client, blockIds.toArray, listener)
+        .start(OpenBlocks(blockIds.map(BlockId.apply)))
     } catch {
       case e: Exception =>
         logError("Exception while beginning fetchBlocks", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
new file mode 100644
index 0000000..9fa4fa7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import org.apache.spark.SparkConf
+import org.apache.spark.network.util.{TransportConf, ConfigProvider}
+
+/**
+ * Utility for creating a [[TransportConf]] from a [[SparkConf]].
+ */
+object SparkTransportConf {
+  def fromSparkConf(conf: SparkConf): TransportConf = {
+    new TransportConf(new ConfigProvider {
+      override def get(name: String): String = conf.get(name)
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index 11793ea..f56d165 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.shuffle.BlockFetchingListener
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.util.Utils
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -79,13 +80,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
   }
 
   override def fetchBlocks(
-      hostName: String,
+      host: String,
       port: Int,
-      blockIds: Seq[String],
+      execId: String,
+      blockIds: Array[String],
       listener: BlockFetchingListener): Unit = {
     checkInit()
 
-    val cmId = new ConnectionManagerId(hostName, port)
+    val cmId = new ConnectionManagerId(host, port)
     val blockMessageArray = new BlockMessageArray(blockIds.map { blockId =>
       BlockMessage.fromGetBlock(GetBlock(BlockId(blockId)))
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f81fa6d..af17b5d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -124,6 +124,9 @@ class DAGScheduler(
   /** If enabled, we may run certain actions like take() and first() locally. */
   private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
 
+  /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
+  private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
+
   private def initializeEventProcessActor() {
     // blocking the thread until supervisor is started, which ensures eventProcessActor is
     // not null before any job is submitted
@@ -1064,7 +1067,9 @@ class DAGScheduler(
           runningStages -= failedStage
         }
 
-        if (failedStages.isEmpty && eventProcessActor != null) {
+        if (disallowStageRetryForTest) {
+          abortStage(failedStage, "Fetch failure will not retry stage due to testing config")
+        } else if (failedStages.isEmpty && eventProcessActor != null) {
           // Don't schedule an event to resubmit failed stages if failed isn't empty, because
           // in that case the event will already have been scheduled. eventProcessActor may be
           // null during unit tests.
@@ -1086,7 +1091,7 @@ class DAGScheduler(
 
         // TODO: mark the executor as failed only if there were lots of fetch failures on it
         if (bmAddress != null) {
-          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
+          handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
         }
 
       case ExceptionFailure(className, description, stackTrace, metrics) =>
@@ -1106,25 +1111,35 @@ class DAGScheduler(
    * Responds to an executor being lost. This is called inside the event loop, so it assumes it can
    * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
    *
+   * We will also assume that we've lost all shuffle blocks associated with the executor if the
+   * executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed
+   * occurred, in which case we presume all shuffle data related to this executor to be lost.
+   *
    * Optionally the epoch during which the failure was caught can be passed to avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node as lost.
    */
-  private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
+  private[scheduler] def handleExecutorLost(
+      execId: String,
+      fetchFailed: Boolean,
+      maybeEpoch: Option[Long] = None) {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
       failedEpoch(execId) = currentEpoch
       logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
       blockManagerMaster.removeExecutor(execId)
-      // TODO: This will be really slow if we keep accumulating shuffle map stages
-      for ((shuffleId, stage) <- shuffleToMapStage) {
-        stage.removeOutputsOnExecutor(execId)
-        val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
-        mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
-      }
-      if (shuffleToMapStage.isEmpty) {
-        mapOutputTracker.incrementEpoch()
+
+      if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
+        // TODO: This will be really slow if we keep accumulating shuffle map stages
+        for ((shuffleId, stage) <- shuffleToMapStage) {
+          stage.removeOutputsOnExecutor(execId)
+          val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
+          mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
+        }
+        if (shuffleToMapStage.isEmpty) {
+          mapOutputTracker.incrementEpoch()
+        }
+        clearCacheLocs()
       }
-      clearCacheLocs()
     } else {
       logDebug("Additional executor lost message for " + execId +
                "(epoch " + currentEpoch + ")")
@@ -1382,7 +1397,7 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule
       dagScheduler.handleExecutorAdded(execId, host)
 
     case ExecutorLost(execId) =>
-      dagScheduler.handleExecutorLost(execId)
+      dagScheduler.handleExecutorLost(execId, fetchFailed = false)
 
     case BeginEvent(task, taskInfo) =>
       dagScheduler.handleBeginEvent(task, taskInfo)

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 071568c..cc13f57 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -102,6 +102,11 @@ private[spark] class Stage(
     }
   }
 
+  /**
+   * Removes all shuffle outputs associated with this executor. Note that this will also remove
+   * outputs which are served by an external shuffle server (if one exists), as they are still
+   * registered with this execId.
+   */
   def removeOutputsOnExecutor(execId: String) {
     var becameUnavailable = false
     for (partition <- 0 until numPartitions) {
@@ -131,4 +136,9 @@ private[spark] class Stage(
   override def toString = "Stage " + id
 
   override def hashCode(): Int = id
+
+  override def equals(other: Any): Boolean = other match {
+    case stage: Stage => stage != null && stage.id == id
+    case _ => false
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a6c23fc..376821f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -687,10 +687,11 @@ private[spark] class TaskSetManager(
       addPendingTask(index, readding=true)
     }
 
-    // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage.
+    // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
+    // and we are not using an external shuffle server which could serve the shuffle outputs.
     // The reason is the next stage wouldn't be able to fetch the data from this dead executor
     // so we would need to rerun these tasks on other executors.
-    if (tasks(0).isInstanceOf[ShuffleMapTask]) {
+    if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) {
       for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = taskInfos(tid).index
         if (successful(index)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 1fb5b2c..f03e8e4 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -62,7 +62,8 @@ private[spark] trait ShuffleWriterGroup {
  * each block stored in each file. In order to find the location of a shuffle block, we search the
  * files within a ShuffleFileGroups associated with the block's reducer.
  */
-
+// Note: Changes to the format in this file should be kept in sync with
+// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
 private[spark]
 class FileShuffleBlockManager(conf: SparkConf)
   extends ShuffleBlockManager with Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index e9805c9..a48f0c9 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -35,6 +35,8 @@ import org.apache.spark.storage._
  * as the filename postfix for data file, and ".index" as the filename postfix for index file.
  *
  */
+// Note: Changes to the format in this file should be kept in sync with
+// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
 private[spark]
 class IndexShuffleBlockManager extends ShuffleBlockManager {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index 6cf9305..f49917b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -74,7 +74,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
 
     val blockFetcherItr = new ShuffleBlockFetcherIterator(
       context,
-      SparkEnv.get.blockTransferService,
+      SparkEnv.get.blockManager.shuffleClient,
       blockManager,
       blocksByAddress,
       serializer,

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 746ed33..183a303 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -107,7 +107,7 @@ private[spark] class HashShuffleWriter[K, V](
       writer.commitAndClose()
       writer.fileSegment().length
     }
-    MapStatus(blockManager.blockManagerId, sizes)
+    MapStatus(blockManager.shuffleServerId, sizes)
   }
 
   private def revertWrites(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 927481b..d75f9d7 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,7 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
     val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
     shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
 
-    mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths)
+    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
   }
 
   /** Close this writer, passing along whether the map completed */

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 8df5ec6..1f01294 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -53,6 +53,8 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
   def name = "rdd_" + rddId + "_" + splitIndex
 }
 
+// Format of the shuffle block ids (including data and index) should be kept in sync with
+// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
 @DeveloperApi
 case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
   def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 58510d7..1f8de28 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -21,9 +21,9 @@ import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream,
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.concurrent.{Await, Future}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
 import scala.util.Random
 
 import akka.actor.{ActorSystem, Props}
@@ -34,8 +34,13 @@ import org.apache.spark.executor._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.netty.{SparkTransportConf, NettyBlockTransferService}
+import org.apache.spark.network.shuffle.{ExecutorShuffleInfo, ExternalShuffleClient}
+import org.apache.spark.network.util.{ConfigProvider, TransportConf}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleManager
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.util._
 
 private[spark] sealed trait BlockValues
@@ -85,9 +90,38 @@ private[spark] class BlockManager(
     new TachyonStore(this, tachyonBlockManager)
   }
 
+  private[spark]
+  val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+  private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)
+  // Check that we're not using external shuffle service with consolidated shuffle files.
+  if (externalShuffleServiceEnabled
+      && conf.getBoolean("spark.shuffle.consolidateFiles", false)
+      && shuffleManager.isInstanceOf[HashShuffleManager]) {
+    throw new UnsupportedOperationException("Cannot use external shuffle service with consolidated"
+      + " shuffle files in hash-based shuffle. Please disable spark.shuffle.consolidateFiles or "
+      + " switch to sort-based shuffle.")
+  }
+
   val blockManagerId = BlockManagerId(
     executorId, blockTransferService.hostName, blockTransferService.port)
 
+  // Address of the server that serves this executor's shuffle files. This is either an external
+  // service, or just our own Executor's BlockManager.
+  private[spark] val shuffleServerId = if (externalShuffleServiceEnabled) {
+    BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
+  } else {
+    blockManagerId
+  }
+
+  // Client to read other executors' shuffle files. This is either an external service, or just the
+  // standard BlockTranserService to directly connect to other Executors.
+  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
+    val appId = conf.get("spark.app.id", "unknown-app-id")
+    new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), appId)
+  } else {
+    blockTransferService
+  }
+
   // Whether to compress broadcast variables that are stored
   private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
   // Whether to compress shuffle output that are stored
@@ -143,10 +177,41 @@ private[spark] class BlockManager(
 
   /**
    * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
-   * BlockManagerWorker actor.
+   * BlockManagerWorker actor. Additionally registers with a local shuffle service if configured.
    */
   private def initialize(): Unit = {
     master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
+
+    // Register Executors' configuration with the local shuffle service, if one should exist.
+    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
+      registerWithExternalShuffleServer()
+    }
+  }
+
+  private def registerWithExternalShuffleServer() {
+    logInfo("Registering executor with local external shuffle service.")
+    val shuffleConfig = new ExecutorShuffleInfo(
+      diskBlockManager.localDirs.map(_.toString),
+      diskBlockManager.subDirsPerLocalDir,
+      shuffleManager.getClass.getName)
+
+    val MAX_ATTEMPTS = 3
+    val SLEEP_TIME_SECS = 5
+
+    for (i <- 1 to MAX_ATTEMPTS) {
+      try {
+        // Synchronous and will throw an exception if we cannot connect.
+        shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
+          shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
+        return
+      } catch {
+        case e: Exception if i < MAX_ATTEMPTS =>
+          val attemptsRemaining =
+          logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
+            + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
+          Thread.sleep(SLEEP_TIME_SECS * 1000)
+      }
+    }
   }
 
   /**
@@ -506,7 +571,7 @@ private[spark] class BlockManager(
     for (loc <- locations) {
       logDebug(s"Getting remote block $blockId from $loc")
       val data = blockTransferService.fetchBlockSync(
-        loc.host, loc.port, blockId.toString).nioByteBuffer()
+        loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
 
       if (data != null) {
         if (asBlockResult) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 99e9253..58fba54 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,12 +38,13 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
   extends Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
-  private val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
+  private[spark]
+  val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
 
   /* Create one local directory for each path mentioned in spark.local.dir; then, inside this
    * directory, create multiple subdirectories that we will hash files into, in order to avoid
    * having really large inodes at the top level. */
-  val localDirs: Array[File] = createLocalDirs(conf)
+  private[spark] val localDirs: Array[File] = createLocalDirs(conf)
   if (localDirs.isEmpty) {
     logError("Failed to create any local dir.")
     System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
@@ -52,6 +53,9 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
 
   addShutdownHook()
 
+  /** Looks up a file by hashing it into one of our local subdirectories. */
+  // This method should be kept in sync with
+  // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
   def getFile(filename: String): File = {
     // Figure out which local directory it hashes to, and which subdirectory in that
     val hash = Utils.nonNegativeHash(filename)
@@ -159,13 +163,16 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
 
   /** Cleanup local dirs and stop shuffle sender. */
   private[spark] def stop() {
-    localDirs.foreach { localDir =>
-      if (localDir.isDirectory() && localDir.exists()) {
-        try {
-          if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
-        } catch {
-          case e: Exception =>
-            logError(s"Exception while deleting local spark dir: $localDir", e)
+    // Only perform cleanup if an external service is not serving our shuffle files.
+    if (!blockManager.externalShuffleServiceEnabled) {
+      localDirs.foreach { localDir =>
+        if (localDir.isDirectory() && localDir.exists()) {
+          try {
+            if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
+          } catch {
+            case e: Exception =>
+              logError(s"Exception while deleting local spark dir: $localDir", e)
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 0d6f3bf..ee89c7e 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue
 import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
 
 import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
+import org.apache.spark.network.BlockTransferService
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
 import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.{CompletionIterator, Utils}
@@ -38,8 +39,8 @@ import org.apache.spark.util.{CompletionIterator, Utils}
  * using too much memory.
  *
  * @param context [[TaskContext]], used for metrics update
- * @param blockTransferService [[BlockTransferService]] for fetching remote blocks
- * @param blockManager  [[BlockManager]] for reading local blocks
+ * @param shuffleClient [[ShuffleClient]] for fetching remote blocks
+ * @param blockManager [[BlockManager]] for reading local blocks
  * @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]].
  *                        For each block we also require the size (in bytes as a long field) in
  *                        order to throttle the memory usage.
@@ -49,7 +50,7 @@ import org.apache.spark.util.{CompletionIterator, Utils}
 private[spark]
 final class ShuffleBlockFetcherIterator(
     context: TaskContext,
-    blockTransferService: BlockTransferService,
+    shuffleClient: ShuffleClient,
     blockManager: BlockManager,
     blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
     serializer: Serializer,
@@ -140,7 +141,8 @@ final class ShuffleBlockFetcherIterator(
     val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap
     val blockIds = req.blocks.map(_._1.toString)
 
-    blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
+    val address = req.address
+    shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
       new BlockFetchingListener {
         override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
           // Only add the buffer to results queue if the iterator is not zombie,
@@ -179,7 +181,7 @@ final class ShuffleBlockFetcherIterator(
     var totalBlocks = 0
     for ((address, blockInfos) <- blocksByAddress) {
       totalBlocks += blockInfos.size
-      if (address == blockManager.blockManagerId) {
+      if (address.executorId == blockManager.blockManagerId.executorId) {
         // Filter out zero-sized blocks
         localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
         numBlocksToFetch += localBlocks.size

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 063895d..68d378f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1237,6 +1237,8 @@ private[spark] object Utils extends Logging {
   }
 
   // Handles idiosyncracies with hash (add more as required)
+  // This method should be kept in sync with
+  // org.apache.spark.network.util.JavaUtils#nonNegativeHash().
   def nonNegativeHash(obj: AnyRef): Int = {
 
     // Required ?

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 81b64c3..429199f 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -202,7 +202,8 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
     val blockManager = SparkEnv.get.blockManager
     val blockTransfer = SparkEnv.get.blockTransferService
     blockManager.master.getLocations(blockId).foreach { cmId =>
-      val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, blockId.toString)
+      val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
+        blockId.toString)
       val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer())
         .asInstanceOf[Iterator[Int]].toList
       assert(deserialized === (1 to 100).toList)

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
new file mode 100644
index 0000000..792b9cd
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.server.TransportServer
+import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleClient}
+
+/**
+ * This suite creates an external shuffle server and routes all shuffle fetches through it.
+ * Note that failures in this suite may arise due to changes in Spark that invalidate expectations
+ * set up in [[ExternalShuffleBlockHandler]], such as changing the format of shuffle files or how
+ * we hash files into folders.
+ */
+class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
+  var server: TransportServer = _
+  var rpcHandler: ExternalShuffleBlockHandler = _
+
+  override def beforeAll() {
+    val transportConf = SparkTransportConf.fromSparkConf(conf)
+    rpcHandler = new ExternalShuffleBlockHandler()
+    val transportContext = new TransportContext(transportConf, rpcHandler)
+    server = transportContext.createServer()
+
+    conf.set("spark.shuffle.manager", "sort")
+    conf.set("spark.shuffle.service.enabled", "true")
+    conf.set("spark.shuffle.service.port", server.getPort.toString)
+  }
+
+  override def afterAll() {
+    server.close()
+  }
+
+  // This test ensures that the external shuffle service is actually in use for the other tests.
+  test("using external shuffle service") {
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+    sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+    sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
+
+    val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)
+
+    rdd.count()
+    rdd.count()
+
+    // Invalidate the registered executors, disallowing access to their shuffle blocks.
+    rpcHandler.clearRegisteredExecutors()
+
+    // Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry"
+    // being set.
+    val e = intercept[SparkException] {
+      rdd.count()
+    }
+    e.getMessage should include ("Fetch failure will not retry stage due to testing config")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
index 2acc02a..19180e8 100644
--- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
@@ -24,10 +24,6 @@ class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
   // This test suite should run all tests in ShuffleSuite with hash-based shuffle.
 
   override def beforeAll() {
-    System.setProperty("spark.shuffle.manager", "hash")
-  }
-
-  override def afterAll() {
-    System.clearProperty("spark.shuffle.manager")
+    conf.set("spark.shuffle.manager", "hash")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index 840d827..d78c99c 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -24,10 +24,6 @@ class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
   // This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
 
   override def beforeAll() {
-    System.setProperty("spark.shuffle.blockTransferService", "netty")
-  }
-
-  override def afterAll() {
-    System.clearProperty("spark.shuffle.blockTransferService")
+    conf.set("spark.shuffle.blockTransferService", "netty")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 2bdd84c..cda942e 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -30,10 +30,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   val conf = new SparkConf(loadDefaults = false)
 
+  // Ensure that the DAGScheduler doesn't retry stages whose fetches fail, so that we accurately
+  // test that the shuffle works (rather than retrying until all blocks are local to one Executor).
+  conf.set("spark.test.noStageRetry", "true")
+
   test("groupByKey without compression") {
     try {
       System.setProperty("spark.shuffle.compress", "false")
-      sc = new SparkContext("local", "test")
+      sc = new SparkContext("local", "test", conf)
       val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
       val groups = pairs.groupByKey(4).collect()
       assert(groups.size === 2)
@@ -47,7 +51,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
   }
 
   test("shuffle non-zero block size") {
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     val NUM_BLOCKS = 3
 
     val a = sc.parallelize(1 to 10, 2)
@@ -73,7 +77,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   test("shuffle serializer") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     val a = sc.parallelize(1 to 10, 2)
     val b = a.map { x =>
       (x, new NonJavaSerializableClass(x * 2))
@@ -89,7 +93,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   test("zero sized blocks") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
 
     // 10 partitions from 4 keys
     val NUM_BLOCKS = 10
@@ -116,7 +120,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   test("zero sized blocks without kryo") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
 
     // 10 partitions from 4 keys
     val NUM_BLOCKS = 10
@@ -141,7 +145,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   test("shuffle on mutable pairs") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
     val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
     val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
@@ -154,7 +158,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
   test("sorting on mutable pairs") {
     // This is not in SortingSuite because of the local cluster setup.
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
     val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
     val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
@@ -168,7 +172,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   test("cogroup using mutable pairs") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
     val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
     val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
@@ -195,7 +199,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   test("subtract mutable pairs") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
     val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
     val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
@@ -209,11 +213,8 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   test("sort with Java non serializable class - Kryo") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    val conf = new SparkConf()
-      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .setAppName("test")
-      .setMaster("local-cluster[2,1,512]")
-    sc = new SparkContext(conf)
+    val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", myConf)
     val a = sc.parallelize(1 to 10, 2)
     val b = a.map { x =>
       (new NonJavaSerializableClass(x), x)
@@ -226,10 +227,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
 
   test("sort with Java non serializable class - Java") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    val conf = new SparkConf()
-      .setAppName("test")
-      .setMaster("local-cluster[2,1,512]")
-    sc = new SparkContext(conf)
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     val a = sc.parallelize(1 to 10, 2)
     val b = a.map { x =>
       (new NonJavaSerializableClass(x), x)

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
index 639e56c..6335817 100644
--- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -24,10 +24,6 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
   // This test suite should run all tests in ShuffleSuite with sort-based shuffle.
 
   override def beforeAll() {
-    System.setProperty("spark.shuffle.manager", "sort")
-  }
-
-  override def afterAll() {
-    System.clearProperty("spark.shuffle.manager")
+    conf.set("spark.shuffle.manager", "sort")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 3925f0c..bbdc956 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -121,7 +121,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
     }
 
     val appId = "testId"
-    val executorId = "executor.1"
+    val executorId = "1"
     conf.set("spark.app.id", appId)
     conf.set("spark.executor.id", executorId)
 
@@ -138,7 +138,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
       override val metricRegistry = new MetricRegistry()
     }
 
-    val executorId = "executor.1"
+    val executorId = "1"
     conf.set("spark.executor.id", executorId)
 
     val instanceName = "executor"

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 4e502cf..28f7665 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -21,22 +21,19 @@ import java.util.concurrent.Semaphore
 
 import scala.concurrent.future
 import scala.concurrent.ExecutionContext.Implicits.global
-import org.apache.spark.{TaskContextImpl, TaskContext}
-import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
 
-import org.mockito.Mockito._
 import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
-
 import org.scalatest.FunSuite
 
-import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.{SparkConf, TaskContextImpl}
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.BlockFetchingListener
 import org.apache.spark.serializer.TestSerializer
 
-
 class ShuffleBlockFetcherIteratorSuite extends FunSuite {
   // Some of the tests are quite tricky because we are testing the cleanup behavior
   // in the presence of faults.
@@ -44,10 +41,10 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
   /** Creates a mock [[BlockTransferService]] that returns data from the given map. */
   private def createMockTransfer(data: Map[BlockId, ManagedBuffer]): BlockTransferService = {
     val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+    when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
-        val blocks = invocation.getArguments()(2).asInstanceOf[Seq[String]]
-        val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+        val blocks = invocation.getArguments()(3).asInstanceOf[Array[String]]
+        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
 
         for (blockId <- blocks) {
           if (data.contains(BlockId(blockId))) {
@@ -118,7 +115,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     // 3 local blocks, and 2 remote blocks
     // (but from the same block manager so one call to fetchBlocks)
     verify(blockManager, times(3)).getBlockData(any())
-    verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any())
+    verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any())
   }
 
   test("release current unexhausted buffer in case the task completes early") {
@@ -138,9 +135,9 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     val sem = new Semaphore(0)
 
     val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+    when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
-        val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
         future {
           // Return the first two blocks, and wait till task completion before returning the 3rd one
           listener.onBlockFetchSuccess(
@@ -201,9 +198,9 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     val sem = new Semaphore(0)
 
     val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
+    when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
-        val listener = invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
         future {
           // Return the first block, and then fail.
           listener.onBlockFetchSuccess(

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/pom.xml
----------------------------------------------------------------------
diff --git a/network/common/pom.xml b/network/common/pom.xml
index a33e44b..ea88714 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -85,9 +85,25 @@
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
     <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
     <plugins>
+      <!-- Create a test-jar so network-shuffle can depend on our test utilities. -->
       <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.2</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>test-jar-on-test-compile</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
       </plugin>
     </plugins>
   </build>

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
index 854aa66..a271841 100644
--- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -52,15 +52,13 @@ public class TransportContext {
   private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
 
   private final TransportConf conf;
-  private final StreamManager streamManager;
   private final RpcHandler rpcHandler;
 
   private final MessageEncoder encoder;
   private final MessageDecoder decoder;
 
-  public TransportContext(TransportConf conf, StreamManager streamManager, RpcHandler rpcHandler) {
+  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
     this.conf = conf;
-    this.streamManager = streamManager;
     this.rpcHandler = rpcHandler;
     this.encoder = new MessageEncoder();
     this.decoder = new MessageDecoder();
@@ -70,8 +68,14 @@ public class TransportContext {
     return new TransportClientFactory(this);
   }
 
+  /** Create a server which will attempt to bind to a specific port. */
+  public TransportServer createServer(int port) {
+    return new TransportServer(this, port);
+  }
+
+  /** Creates a new server, binding to any available ephemeral port. */
   public TransportServer createServer() {
-    return new TransportServer(this);
+    return new TransportServer(this, 0);
   }
 
   /**
@@ -109,7 +113,7 @@ public class TransportContext {
     TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
     TransportClient client = new TransportClient(channel, responseHandler);
     TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
-      streamManager, rpcHandler);
+      rpcHandler);
     return new TransportChannelHandler(client, responseHandler, requestHandler);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index b1732fc..01c143f 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -19,9 +19,13 @@ package org.apache.spark.network.client;
 
 import java.io.Closeable;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.SettableFuture;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -129,7 +133,7 @@ public class TransportClient implements Closeable {
     final long startTime = System.currentTimeMillis();
     logger.trace("Sending RPC to {}", serverAddr);
 
-    final long requestId = UUID.randomUUID().getLeastSignificantBits();
+    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
     handler.addRpcRequest(requestId, callback);
 
     channel.writeAndFlush(new RpcRequest(requestId, message)).addListener(
@@ -151,6 +155,32 @@ public class TransportClient implements Closeable {
       });
   }
 
+  /**
+   * Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to
+   * a specified timeout for a response.
+   */
+  public byte[] sendRpcSync(byte[] message, long timeoutMs) {
+    final SettableFuture<byte[]> result = SettableFuture.create();
+
+    sendRpc(message, new RpcResponseCallback() {
+      @Override
+      public void onSuccess(byte[] response) {
+        result.set(response);
+      }
+
+      @Override
+      public void onFailure(Throwable e) {
+        result.setException(e);
+      }
+    });
+
+    try {
+      return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
   @Override
   public void close() {
     // close is a local operation and should finish with milliseconds; timeout just to be safe

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 10eb9ef..e7fa4f6 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -78,15 +78,17 @@ public class TransportClientFactory implements Closeable {
    *
    * Concurrency: This method is safe to call from multiple threads.
    */
-  public TransportClient createClient(String remoteHost, int remotePort) throws TimeoutException {
+  public TransportClient createClient(String remoteHost, int remotePort) {
     // Get connection from the connection pool first.
     // If it is not found or not active, create a new one.
     final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
     TransportClient cachedClient = connectionPool.get(address);
-    if (cachedClient != null && cachedClient.isActive()) {
-      return cachedClient;
-    } else if (cachedClient != null) {
-      connectionPool.remove(address, cachedClient); // Remove inactive clients.
+    if (cachedClient != null) {
+      if (cachedClient.isActive()) {
+        return cachedClient;
+      } else {
+        connectionPool.remove(address, cachedClient); // Remove inactive clients.
+      }
     }
 
     logger.debug("Creating new connection to " + address);
@@ -115,13 +117,14 @@ public class TransportClientFactory implements Closeable {
     // Connect to the remote server
     ChannelFuture cf = bootstrap.connect(address);
     if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
-      throw new TimeoutException(
+      throw new RuntimeException(
         String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
     } else if (cf.cause() != null) {
       throw new RuntimeException(String.format("Failed to connect to %s", address), cf.cause());
     }
 
-    // Successful connection
+    // Successful connection -- in the event that two threads raced to create a client, we will
+    // use the first one that was put into the connectionPool and close the one we made here.
     assert client.get() != null : "Channel future completed successfully with null client";
     TransportClient oldClient = connectionPool.putIfAbsent(address, client.get());
     if (oldClient == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message