spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-16017][CORE] Send hostname from CoarseGrainedExecutorBackend to driver
Date Fri, 17 Jun 2016 22:48:20 GMT
Repository: spark
Updated Branches:
  refs/heads/master 298c4ae81 -> 62d8fe208


[SPARK-16017][CORE] Send hostname from CoarseGrainedExecutorBackend to driver

## What changes were proposed in this pull request?

[SPARK-15395](https://issues.apache.org/jira/browse/SPARK-15395) changes the behavior that
how the driver gets the executor host and the driver will get the executor IP address instead
of the host name. This PR just sends the hostname from executors to driver so that driver
can pass it to TaskScheduler.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13741 from zsxwing/SPARK-16017.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62d8fe20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62d8fe20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62d8fe20

Branch: refs/heads/master
Commit: 62d8fe2089659e8212753a622708517e0f4a77bc
Parents: 298c4ae
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Fri Jun 17 15:48:17 2016 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Fri Jun 17 15:48:17 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/executor/CoarseGrainedExecutorBackend.scala  | 7 ++++---
 .../spark/scheduler/cluster/CoarseGrainedClusterMessage.scala | 4 ++--
 .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala     | 6 +++---
 .../test/scala/org/apache/spark/HeartbeatReceiverSuite.scala  | 4 ++--
 .../spark/deploy/StandaloneDynamicAllocationSuite.scala       | 2 +-
 5 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/62d8fe20/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e087295..ccc6c36 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -39,6 +39,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     override val rpcEnv: RpcEnv,
     driverUrl: String,
     executorId: String,
+    hostname: String,
     cores: Int,
     userClassPath: Seq[URL],
     env: SparkEnv)
@@ -57,7 +58,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
-      ref.ask[Boolean](RegisterExecutor(executorId, self, cores, extractLogUrls))
+      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
     }(ThreadUtils.sameThread).onComplete {
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       case Success(msg) =>
@@ -75,7 +76,7 @@ private[spark] class CoarseGrainedExecutorBackend(
   }
 
   override def receive: PartialFunction[Any, Unit] = {
-    case RegisteredExecutor(hostname) =>
+    case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
       executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
 
@@ -201,7 +202,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         driverConf, executorId, hostname, port, cores, isLocal = false)
 
       env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
-        env.rpcEnv, driverUrl, executorId, cores, userClassPath, env))
+        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
       workerUrl.foreach { url =>
         env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/62d8fe20/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 46a8291..edc8aac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -40,8 +40,7 @@ private[spark] object CoarseGrainedClusterMessages {
 
   sealed trait RegisterExecutorResponse
 
-  case class RegisteredExecutor(hostname: String) extends CoarseGrainedClusterMessage
-    with RegisterExecutorResponse
+  case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
 
   case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
     with RegisterExecutorResponse
@@ -50,6 +49,7 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RegisterExecutor(
       executorId: String,
       executorRef: RpcEndpointRef,
+      hostname: String,
       cores: Int,
       logUrls: Map[String, String])
     extends CoarseGrainedClusterMessage

http://git-wip-us.apache.org/repos/asf/spark/blob/62d8fe20/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e84cb63..967c4d5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -148,7 +148,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
 
-      case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
+      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
         if (executorDataMap.contains(executorId)) {
           executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
           context.reply(true)
@@ -164,7 +164,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
           addressToExecutorId(executorAddress) = executorId
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
-          val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
+          val data = new ExecutorData(executorRef, executorRef.address, hostname,
             cores, cores, logUrls)
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
@@ -178,7 +178,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
rpcEnv: Rp
               logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
             }
           }
-          executorRef.send(RegisteredExecutor(executorAddress.host))
+          executorRef.send(RegisteredExecutor)
           // Note: some tests expect the reply to come after we put the executor in the map
           context.reply(true)
           listenerBus.post(

http://git-wip-us.apache.org/repos/asf/spark/blob/62d8fe20/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 81b94b5..5e2ba31 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -174,9 +174,9 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
     fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
-      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
+      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
     fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
-      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
+      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty))
     heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)

http://git-wip-us.apache.org/repos/asf/spark/blob/62d8fe20/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 3d39bd4..8140270 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -559,7 +559,7 @@ class StandaloneDynamicAllocationSuite
       val endpointRef = mock(classOf[RpcEndpointRef])
       val mockAddress = mock(classOf[RpcAddress])
       when(endpointRef.address).thenReturn(mockAddress)
-      val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
+      val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty)
       val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
       backend.driverEndpoint.askWithRetry[Boolean](message)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message