spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-14437][CORE] Use the address that NettyBlockTransferService listens to create BlockManagerId
Date Sat, 09 Apr 2016 00:18:23 GMT
Repository: spark
Updated Branches:
  refs/heads/master 906eef4c7 -> 4d7c35926


[SPARK-14437][CORE] Use the address that NettyBlockTransferService listens to create BlockManagerId

## What changes were proposed in this pull request?

Here is why SPARK-14437 happens:
BlockManagerId is created using NettyBlockTransferService.hostName which comes from `customHostname`.
And `Executor` will set `customHostname` to the hostname which is detected by the driver.
However, the driver may not be able to detect the correct address in some complicated network
(Netty's Channel.remoteAddress doesn't always return a connectable address). In such case,
`BlockManagerId` will be created using a wrong hostname.

To fix this issue, this PR uses `hostname` provided by `SparkEnv.create` to create `NettyBlockTransferService`
and set `NettyBlockTransferService.hostname` to this one directly. A bonus of this approach
is NettyBlockTransferService won't bound to `0.0.0.0` which is much safer.

## How was this patch tested?

Manually checked the bound address using local-cluster.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12240 from zsxwing/SPARK-14437.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d7c3592
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d7c3592
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d7c3592

Branch: refs/heads/master
Commit: 4d7c35926371f9e016577987c037abcf984443d9
Parents: 906eef4
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Fri Apr 8 17:18:19 2016 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Fri Apr 8 17:18:19 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala     |  3 ++-
 .../spark/network/netty/NettyBlockTransferService.scala | 12 +++++++-----
 .../network/netty/NettyBlockTransferSecuritySuite.scala |  4 ++--
 .../network/netty/NettyBlockTransferServiceSuite.scala  |  2 +-
 .../spark/storage/BlockManagerReplicationSuite.scala    |  2 +-
 .../org/apache/spark/storage/BlockManagerSuite.scala    |  6 +++---
 project/MimaExcludes.scala                              |  3 +++
 .../spark/streaming/ReceivedBlockHandlerSuite.scala     |  2 +-
 8 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4d7c3592/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 700e2cb..ab89f4c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -314,7 +314,8 @@ object SparkEnv extends Logging {
         UnifiedMemoryManager(conf, numUsableCores)
       }
 
-    val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
+    val blockTransferService =
+      new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
 
     val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
       BlockManagerMaster.DRIVER_ENDPOINT_NAME,

http://git-wip-us.apache.org/repos/asf/spark/blob/4d7c3592/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 5f3d453..33a3219 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -39,7 +39,11 @@ import org.apache.spark.util.Utils
 /**
  * A BlockTransferService that uses Netty to fetch a set of blocks at at time.
  */
-class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores:
Int)
+private[spark] class NettyBlockTransferService(
+    conf: SparkConf,
+    securityManager: SecurityManager,
+    override val hostName: String,
+    numCores: Int)
   extends BlockTransferService {
 
   // TODO: Don't use Java serialization, use a more cross-version compatible serialization
format.
@@ -65,13 +69,13 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
     clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
     server = createServer(serverBootstrap.toList)
     appId = conf.getAppId
-    logInfo("Server created on " + server.getPort)
+    logInfo(s"Server created on ${hostName}:${server.getPort}")
   }
 
   /** Creates and binds the TransportServer, possibly trying multiple ports. */
   private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer =
{
     def startService(port: Int): (TransportServer, Int) = {
-      val server = transportContext.createServer(port, bootstraps.asJava)
+      val server = transportContext.createServer(hostName, port, bootstraps.asJava)
       (server, server.getPort)
     }
 
@@ -109,8 +113,6 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
     }
   }
 
-  override def hostName: String = Utils.localHostName()
-
   override def port: Int = server.getPort
 
   override def uploadBlock(

http://git-wip-us.apache.org/repos/asf/spark/blob/4d7c3592/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 6da18cf..ed15e77 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -108,11 +108,11 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar
wi
     when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
 
     val securityManager0 = new SecurityManager(conf0)
-    val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1)
+    val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", numCores
= 1)
     exec0.init(blockManager)
 
     val securityManager1 = new SecurityManager(conf1)
-    val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1)
+    val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", numCores
= 1)
     exec1.init(blockManager)
 
     val result = fetchBlock(exec0, exec1, "1", blockId) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/4d7c3592/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index cc1a9e0..f3c156e 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -80,7 +80,7 @@ class NettyBlockTransferServiceSuite
       .set("spark.blockManager.port", port.toString)
     val securityManager = new SecurityManager(conf)
     val blockDataManager = mock(classOf[BlockDataManager])
-    val service = new NettyBlockTransferService(conf, securityManager, numCores = 1)
+    val service = new NettyBlockTransferService(conf, securityManager, "localhost", numCores
= 1)
     service.init(blockDataManager)
     service
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4d7c3592/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 2ec5319..d26df7e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with
Befo
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     conf.set("spark.testing.memory", maxMem.toString)
     conf.set("spark.memory.offHeap.size", maxMem.toString)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
+    val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores
= 1)
     val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
     val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,

http://git-wip-us.apache.org/repos/asf/spark/blob/4d7c3592/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 66b28de..a1c2933 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -78,7 +78,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     conf.set("spark.memory.offHeap.size", maxMem.toString)
     val serializer = new KryoSerializer(conf)
     val transfer = transferService
-      .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1))
+      .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores =
1))
     val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
     val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
@@ -490,7 +490,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val blockManager = makeBlockManager(128, "exec", bmMaster)
     val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
     val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
-    assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+    assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
   }
 
   test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
@@ -852,7 +852,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
   test("block store put failure") {
     // Use Java serializer so we can create an unserializable error.
     conf.set("spark.testing.memory", "1200")
-    val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
+    val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores
= 1)
     val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
     store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,

http://git-wip-us.apache.org/repos/asf/spark/blob/4d7c3592/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a53161d..f240c30 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -615,6 +615,9 @@ object MimaExcludes {
         // [SPARK-13430][ML] moved featureCol from LinearRegressionModelSummary to LinearRegressionSummary
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.regression.LinearRegressionSummary.this")
       ) ++ Seq(
+        // [SPARK-14437][Core] Use the address that NettyBlockTransferService listens to
create BlockManagerId
+        ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockTransferService.this")
+      ) ++ Seq(
         // [SPARK-13048][ML][MLLIB] keepLastCheckpoint option for LDA EM optimizer
         ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.DistributedLDAModel.this")
       )

http://git-wip-us.apache.org/repos/asf/spark/blob/4d7c3592/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 5fc53bc..39d0de5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -266,7 +266,7 @@ class ReceivedBlockHandlerSuite
       conf: SparkConf,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
+    val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores
= 1)
     val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager,
conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
     memManager.setMemoryStore(blockManager.memoryStore)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message