spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: [SPARK-3281] Remove Netty specific code in BlockManager / shuffle
Date Thu, 28 Aug 2014 21:08:24 GMT
Repository: spark
Updated Branches:
  refs/heads/master 41dc5987d -> be53c54b5


[SPARK-3281] Remove Netty specific code in BlockManager / shuffle

Netty functionality will be added back in subsequent PRs by using the BlockTransferService
interface.

Author: Reynold Xin <rxin@apache.org>

Closes #2181 from rxin/SPARK-3281 and squashes the following commits:

5494b0e [Reynold Xin] Fix extra port.
ff6d1e1 [Reynold Xin] [SPARK-3281] Remove Netty specific code in BlockManager.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be53c54b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be53c54b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be53c54b

Branch: refs/heads/master
Commit: be53c54b5c685e1d04d49bd554e05029a5a106e1
Parents: 41dc598
Author: Reynold Xin <rxin@apache.org>
Authored: Thu Aug 28 14:08:07 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Thu Aug 28 14:08:07 2014 -0700

----------------------------------------------------------------------
 .../spark/storage/BlockFetcherIterator.scala    | 73 +-------------------
 .../org/apache/spark/storage/BlockManager.scala | 41 +----------
 .../apache/spark/storage/BlockManagerId.scala   | 20 ++----
 .../org/apache/spark/util/JsonProtocol.scala    |  6 +-
 .../apache/spark/MapOutputTrackerSuite.scala    | 30 ++++----
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +-
 .../storage/BlockFetcherIteratorSuite.scala     | 16 ++---
 .../spark/storage/BlockManagerSuite.scala       |  6 +-
 .../storage/StorageStatusListenerSuite.scala    |  4 +-
 .../org/apache/spark/storage/StorageSuite.scala | 10 +--
 .../spark/ui/storage/StorageTabSuite.scala      |  2 +-
 .../org/apache/spark/util/AkkaUtilsSuite.scala  | 17 +++--
 .../apache/spark/util/JsonProtocolSuite.scala   | 15 ++--
 13 files changed, 64 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index ca60ec7..4ab8ec8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -33,16 +33,8 @@ import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.Utils
 
 /**
- * A block fetcher iterator interface. There are two implementations:
- *
- * BasicBlockFetcherIterator: uses a custom-built NIO communication layer.
- * NettyBlockFetcherIterator: uses Netty (OIO) as the communication layer.
- *
- * Eventually we would like the two to converge and use a single NIO-based communication
layer,
- * but extensive tests show that under some circumstances (e.g. large shuffles with lots
of cores),
- * NIO would perform poorly and thus the need for the Netty OIO one.
+ * A block fetcher iterator interface for fetching shuffle blocks.
  */
-
 private[storage]
 trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging
{
   def initialize()
@@ -262,67 +254,4 @@ object BlockFetcherIterator {
     }
   }
   // End of BasicBlockFetcherIterator
-
-  class NettyBlockFetcherIterator(
-      blockManager: BlockManager,
-      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
-      serializer: Serializer,
-      readMetrics: ShuffleReadMetrics)
-    extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics)
{
-
-    override protected def sendRequest(req: FetchRequest) {
-      logDebug("Sending request for %d blocks (%s) from %s".format(
-        req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
-      val cmId = new ConnectionManagerId(req.address.host, req.address.port)
-
-      bytesInFlight += req.size
-      val sizeMap = req.blocks.toMap // so we can look up the size of each blockID
-
-      // This could throw a TimeoutException. In that case we will just retry the task.
-      val client = blockManager.nettyBlockClientFactory.createClient(
-        cmId.host, req.address.nettyPort)
-      val blocks = req.blocks.map(_._1.toString)
-
-      client.fetchBlocks(
-        blocks,
-        new BlockClientListener {
-          override def onFetchFailure(blockId: String, errorMsg: String): Unit = {
-            logError(s"Could not get block(s) from $cmId with error: $errorMsg")
-            for ((blockId, size) <- req.blocks) {
-              results.put(new FetchResult(blockId, -1, null))
-            }
-          }
-
-          override def onFetchSuccess(blockId: String, data: ReferenceCountedBuffer): Unit
= {
-            // Increment the reference count so the buffer won't be recycled.
-            // TODO: This could result in memory leaks when the task is stopped due to exception
-            // before the iterator is exhausted.
-            data.retain()
-            val buf = data.byteBuffer()
-            val blockSize = buf.remaining()
-            val bid = BlockId(blockId)
-
-            // TODO: remove code duplication between here and BlockManager.dataDeserialization.
-            results.put(new FetchResult(bid, sizeMap(bid), () => {
-              def createIterator: Iterator[Any] = {
-                val stream = blockManager.wrapForCompression(bid, data.inputStream())
-                serializer.newInstance().deserializeStream(stream).asIterator
-              }
-              new LazyInitIterator(createIterator) {
-                // Release the buffer when we are done traversing it.
-                override def close(): Unit = data.release()
-              }
-            }))
-
-            readMetrics.synchronized {
-              readMetrics.remoteBytesRead += blockSize
-              readMetrics.remoteBlocksFetched += 1
-            }
-            logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
-          }
-        }
-      )
-    }
-  }
-  // End of NettyBlockFetcherIterator
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 12a92d4..1eb622c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -32,8 +32,6 @@ import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
-import org.apache.spark.network.netty.client.BlockFetchingClientFactory
-import org.apache.spark.network.netty.server.BlockServer
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.util._
@@ -90,27 +88,8 @@ private[spark] class BlockManager(
     new TachyonStore(this, tachyonBlockManager)
   }
 
-  private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
-
-  // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
-  private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = {
-    if (useNetty) new BlockFetchingClientFactory(conf) else null
-  }
-
-  private val nettyBlockServer: BlockServer = {
-    if (useNetty) {
-      val server = new BlockServer(conf, this)
-      logInfo(s"Created NettyBlockServer binding to port: ${server.port}")
-      server
-    } else {
-      null
-    }
-  }
-
-  private val nettyPort: Int = if (useNetty) nettyBlockServer.port else 0
-
   val blockManagerId = BlockManagerId(
-    executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
+    executorId, connectionManager.id.host, connectionManager.id.port)
 
   // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
   // for receiving shuffle outputs)
@@ -572,14 +551,8 @@ private[spark] class BlockManager(
       blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
       serializer: Serializer,
       readMetrics: ShuffleReadMetrics): BlockFetcherIterator = {
-    val iter =
-      if (conf.getBoolean("spark.shuffle.use.netty", false)) {
-        new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer,
-          readMetrics)
-      } else {
-        new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,
-          readMetrics)
-      }
+    val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress,
serializer,
+      readMetrics)
     iter.initialize()
     iter
   }
@@ -1092,14 +1065,6 @@ private[spark] class BlockManager(
     connectionManager.stop()
     shuffleBlockManager.stop()
     diskBlockManager.stop()
-
-    if (nettyBlockClientFactory != null) {
-      nettyBlockClientFactory.stop()
-    }
-    if (nettyBlockServer != null) {
-      nettyBlockServer.stop()
-    }
-
     actorSystem.stop(slaveActor)
     blockInfo.clear()
     memoryStore.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index b1585bd..b7bcb2d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -36,11 +36,10 @@ import org.apache.spark.util.Utils
 class BlockManagerId private (
     private var executorId_ : String,
     private var host_ : String,
-    private var port_ : Int,
-    private var nettyPort_ : Int
+    private var port_ : Int
   ) extends Externalizable {
 
-  private def this() = this(null, null, 0, 0)  // For deserialization only
+  private def this() = this(null, null, 0)  // For deserialization only
 
   def executorId: String = executorId_
 
@@ -60,32 +59,28 @@ class BlockManagerId private (
 
   def port: Int = port_
 
-  def nettyPort: Int = nettyPort_
-
   override def writeExternal(out: ObjectOutput) {
     out.writeUTF(executorId_)
     out.writeUTF(host_)
     out.writeInt(port_)
-    out.writeInt(nettyPort_)
   }
 
   override def readExternal(in: ObjectInput) {
     executorId_ = in.readUTF()
     host_ = in.readUTF()
     port_ = in.readInt()
-    nettyPort_ = in.readInt()
   }
 
   @throws(classOf[IOException])
   private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
 
-  override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port,
nettyPort)
+  override def toString = s"BlockManagerId($executorId, $host, $port)"
 
-  override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
+  override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port
 
   override def equals(that: Any) = that match {
     case id: BlockManagerId =>
-      executorId == id.executorId && port == id.port && host == id.host &&
nettyPort == id.nettyPort
+      executorId == id.executorId && port == id.port && host == id.host
     case _ =>
       false
   }
@@ -100,11 +95,10 @@ private[spark] object BlockManagerId {
    * @param execId ID of the executor.
    * @param host Host name of the block manager.
    * @param port Port of the block manager.
-   * @param nettyPort Optional port for the Netty-based shuffle sender.
    * @return A new [[org.apache.spark.storage.BlockManagerId]].
    */
-  def apply(execId: String, host: String, port: Int, nettyPort: Int) =
-    getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
+  def apply(execId: String, host: String, port: Int) =
+    getCachedBlockManagerId(new BlockManagerId(execId, host, port))
 
   def apply(in: ObjectInput) = {
     val obj = new BlockManagerId()

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index db73847..a754345 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -295,8 +295,7 @@ private[spark] object JsonProtocol {
   def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
     ("Executor ID" -> blockManagerId.executorId) ~
     ("Host" -> blockManagerId.host) ~
-    ("Port" -> blockManagerId.port) ~
-    ("Netty Port" -> blockManagerId.nettyPort)
+    ("Port" -> blockManagerId.port)
   }
 
   def jobResultToJson(jobResult: JobResult): JValue = {
@@ -644,8 +643,7 @@ private[spark] object JsonProtocol {
     val executorId = (json \ "Executor ID").extract[String]
     val host = (json \ "Host").extract[String]
     val port = (json \ "Port").extract[Int]
-    val nettyPort = (json \ "Netty Port").extract[Int]
-    BlockManagerId(executorId, host, port, nettyPort)
+    BlockManagerId(executorId, host, port)
   }
 
   def jobResultFromJson(json: JValue): JobResult = {

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 9702838..5369169 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -69,13 +69,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)
     val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
     val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
-    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
+    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
         Array(compressedSize1000, compressedSize10000)))
-    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
+    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
         Array(compressedSize10000, compressedSize1000)))
     val statuses = tracker.getServerStatuses(10, 0)
-    assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000, 0), size1000),
-                                  (BlockManagerId("b", "hostB", 1000, 0), size10000)))
+    assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
+                                  (BlockManagerId("b", "hostB", 1000), size10000)))
     tracker.stop()
   }
 
@@ -86,9 +86,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     tracker.registerShuffle(10, 2)
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)
-    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
+    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
       Array(compressedSize1000, compressedSize10000)))
-    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
+    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
       Array(compressedSize10000, compressedSize1000)))
     assert(tracker.containsShuffle(10))
     assert(tracker.getServerStatuses(10, 0).nonEmpty)
@@ -105,14 +105,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext
{
     tracker.registerShuffle(10, 2)
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)
-    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
+    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
         Array(compressedSize1000, compressedSize1000, compressedSize1000)))
-    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
+    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
         Array(compressedSize10000, compressedSize1000, compressedSize1000)))
 
     // As if we had two simultaneous fetch failures
-    tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
-    tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
+    tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+    tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
 
     // The remaining reduce task might try to grab the output despite the shuffle failure;
     // this should cause it to fail, and the scheduler will ignore the failure due to the
@@ -145,13 +145,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext
{
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
     masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
+      BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
     assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
-      Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+      Seq((BlockManagerId("a", "hostA", 1000), size1000)))
 
-    masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
+    masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
     intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
@@ -174,7 +174,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     // Frame size should be ~123B, and no exception should be thrown
     masterTracker.registerShuffle(10, 1)
     masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
+      BlockManagerId("88", "mph", 1000), Array.fill[Byte](10)(0)))
     masterActor.receive(GetMapOutputStatuses(10))
   }
 
@@ -195,7 +195,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     masterTracker.registerShuffle(20, 100)
     (0 until 100).foreach { i =>
       masterTracker.registerMapOutput(20, i, new MapStatus(
-        BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
+        BlockManagerId("999", "mps", 1000), Array.fill[Byte](4000000)(0)))
     }
     intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f5fed98..1a42fc1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -736,7 +736,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite"))
with F
    new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2))
 
   private def makeBlockManagerId(host: String): BlockManagerId =
-    BlockManagerId("exec-" + host, host, 12345, 0)
+    BlockManagerId("exec-" + host, host, 12345)
 
   private def assertDataStructuresEmpty = {
     assert(scheduler.activeJobs.isEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
index bcbfe8b..1591284 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
@@ -41,7 +41,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
     val blockManager = mock(classOf[BlockManager])
     val connManager = mock(classOf[ConnectionManager])
     doReturn(connManager).when(blockManager).connectionManager
-    doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId
+    doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId
 
     doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight
 
@@ -66,7 +66,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
     doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
     doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
 
-    val bmId = BlockManagerId("test-client", "test-client",1 , 0)
+    val bmId = BlockManagerId("test-client", "test-client", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
     )
@@ -97,7 +97,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
     val blockManager = mock(classOf[BlockManager])
     val connManager = mock(classOf[ConnectionManager])
     doReturn(connManager).when(blockManager).connectionManager
-    doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId
+    doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId
 
     doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight
 
@@ -117,7 +117,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
     doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
     doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
 
-    val bmId = BlockManagerId("test-client", "test-client",1 , 0)
+    val bmId = BlockManagerId("test-client", "test-client", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
     )
@@ -155,12 +155,12 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
     when(blockManager.futureExecContext).thenReturn(global)
 
     when(blockManager.blockManagerId).thenReturn(
-      BlockManagerId("test-client", "test-client", 1, 0))
+      BlockManagerId("test-client", "test-client", 1))
     when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024)
 
     val blId1 = ShuffleBlockId(0,0,0)
     val blId2 = ShuffleBlockId(0,1,0)
-    val bmId = BlockManagerId("test-server", "test-server",1 , 0)
+    val bmId = BlockManagerId("test-server", "test-server", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (bmId, Seq((blId1, 1L), (blId2, 1L)))
     )
@@ -211,10 +211,10 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
     when(blockManager.futureExecContext).thenReturn(global)
 
     when(blockManager.blockManagerId).thenReturn(
-      BlockManagerId("test-client", "test-client", 1, 0))
+      BlockManagerId("test-client", "test-client", 1))
     when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024)
 
-    val bmId = BlockManagerId("test-server", "test-server",1 , 0)
+    val bmId = BlockManagerId("test-server", "test-server", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (bmId, Seq((blId1, 1L), (blId2, 1L)))
     )

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f32ce6f..bdcea07 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -139,9 +139,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("BlockManagerId object caching") {
-    val id1 = BlockManagerId("e1", "XXX", 1, 0)
-    val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as
id1
-    val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object
+    val id1 = BlockManagerId("e1", "XXX", 1)
+    val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1
+    val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object
     assert(id2 === id1, "id2 is not same as id1")
     assert(id2.eq(id1), "id2 is not the same object as id1")
     assert(id3 != id1, "id3 is same as id1")

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 7671cb9..4e022a6 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -26,8 +26,8 @@ import org.apache.spark.scheduler._
  * Test the behavior of StorageStatusListener in response to all relevant events.
  */
 class StorageStatusListenerSuite extends FunSuite {
-  private val bm1 = BlockManagerId("big", "dog", 1, 1)
-  private val bm2 = BlockManagerId("fat", "duck", 2, 2)
+  private val bm1 = BlockManagerId("big", "dog", 1)
+  private val bm2 = BlockManagerId("fat", "duck", 2)
   private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
   private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index 38678bb..ef5c55f 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -27,7 +27,7 @@ class StorageSuite extends FunSuite {
 
   // For testing add, update, and remove (for non-RDD blocks)
   private def storageStatus1: StorageStatus = {
-    val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+    val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
     assert(status.blocks.isEmpty)
     assert(status.rddBlocks.isEmpty)
     assert(status.memUsed === 0L)
@@ -78,7 +78,7 @@ class StorageSuite extends FunSuite {
 
   // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
   private def storageStatus2: StorageStatus = {
-    val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+    val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
     assert(status.rddBlocks.isEmpty)
     status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L))
     status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L))
@@ -271,9 +271,9 @@ class StorageSuite extends FunSuite {
 
   // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
   private def stockStorageStatuses: Seq[StorageStatus] = {
-    val status1 = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
-    val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2, 2), 2000L)
-    val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3, 3), 3000L)
+    val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+    val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L)
+    val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L)
     status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
     status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
     status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index a537c72..d9e9c70 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -39,7 +39,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
   private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
   private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
   private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk)
-  private val bm1 = BlockManagerId("big", "dog", 1, 1)
+  private val bm1 = BlockManagerId("big", "dog", 1)
 
   before {
     bus = new LiveListenerBus

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index c4765e5..76bf4cf 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -17,13 +17,16 @@
 
 package org.apache.spark.util
 
+import scala.concurrent.Await
+
 import akka.actor._
+
+import org.scalatest.FunSuite
+
 import org.apache.spark._
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId
-import org.scalatest.FunSuite
 
-import scala.concurrent.Await
 
 /**
   * Test the AkkaUtils with various security settings.
@@ -35,7 +38,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
     conf.set("spark.authenticate", "true")
     conf.set("spark.authenticate.secret", "good")
 
-    val securityManager = new SecurityManager(conf);
+    val securityManager = new SecurityManager(conf)
     val hostname = "localhost"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
       conf = conf, securityManager = securityManager)
@@ -106,13 +109,13 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
     masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
+      BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
 
     // this should succeed since security off
     assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
-           Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+           Seq((BlockManagerId("a", "hostA", 1000), size1000)))
 
     actorSystem.shutdown()
     slaveSystem.shutdown()
@@ -157,13 +160,13 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
     masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
+      BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
 
     // this should succeed since security on and passwords match
     assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
-           Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+           Seq((BlockManagerId("a", "hostA", 1000), size1000)))
 
     actorSystem.shutdown()
     slaveSystem.shutdown()

http://git-wip-us.apache.org/repos/asf/spark/blob/be53c54b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 2fd3b9c..66a17de 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -53,9 +53,9 @@ class JsonProtocolSuite extends FunSuite {
       "Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
     ))
     val blockManagerAdded = SparkListenerBlockManagerAdded(
-      BlockManagerId("Stars", "In your multitude...", 300, 400), 500)
+      BlockManagerId("Stars", "In your multitude...", 300), 500)
     val blockManagerRemoved = SparkListenerBlockManagerRemoved(
-      BlockManagerId("Scarce", "to be counted...", 100, 200))
+      BlockManagerId("Scarce", "to be counted...", 100))
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
     val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
     val applicationEnd = SparkListenerApplicationEnd(42L)
@@ -81,7 +81,7 @@ class JsonProtocolSuite extends FunSuite {
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
     testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
     testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput
= false))
-    testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
+    testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
 
     // StorageLevel
     testStorageLevel(StorageLevel.NONE)
@@ -104,7 +104,7 @@ class JsonProtocolSuite extends FunSuite {
     testJobResult(jobFailed)
 
     // TaskEndReason
-    val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15, 16), 17, 18,
19)
+    val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19)
     val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
     testTaskEndReason(Success)
     testTaskEndReason(Resubmitted)
@@ -343,7 +343,6 @@ class JsonProtocolSuite extends FunSuite {
     assert(bm1.executorId === bm2.executorId)
     assert(bm1.host === bm2.host)
     assert(bm1.port === bm2.port)
-    assert(bm1.nettyPort === bm2.nettyPort)
   }
 
   private def assertEquals(result1: JobResult, result2: JobResult) {
@@ -944,8 +943,7 @@ class JsonProtocolSuite extends FunSuite {
       |  "Block Manager ID": {
       |    "Executor ID": "Stars",
       |    "Host": "In your multitude...",
-      |    "Port": 300,
-      |    "Netty Port": 400
+      |    "Port": 300
       |  },
       |  "Maximum Memory": 500
       |}
@@ -958,8 +956,7 @@ class JsonProtocolSuite extends FunSuite {
       |  "Block Manager ID": {
       |    "Executor ID": "Scarce",
       |    "Host": "to be counted...",
-      |    "Port": 100,
-      |    "Netty Port": 200
+      |    "Port": 100
       |  }
       |}
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message