spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [19/50] [abbrv] git commit: Restored master address for client.
Date Sat, 14 Dec 2013 08:42:03 GMT
Restored master address for client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/560e44a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/560e44a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/560e44a8

Branch: refs/heads/master
Commit: 560e44a8e1d5a2cf42bf640090016f6201c6fbd7
Parents: d092a8c
Author: Prashant Sharma <prashant.s@imaginea.com>
Authored: Tue Nov 26 17:50:29 2013 +0530
Committer: Prashant Sharma <prashant.s@imaginea.com>
Committed: Tue Nov 26 18:18:05 2013 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/client/Client.scala    | 11 +++++++----
 .../scala/org/apache/spark/deploy/master/Master.scala    |  4 ++--
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/560e44a8/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 408692e..f60e56d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -27,7 +27,7 @@ import akka.pattern.AskTimeoutException
 import akka.pattern.ask
 import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
@@ -49,7 +49,7 @@ private[spark] class Client(
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3
 
-  var prevMaster: ActorRef = null // set for unwatching, when it fails.
+  var masterAddress: Address = null
   var actor: ActorRef = null
   var appId: String = null
   var registered = false
@@ -103,11 +103,14 @@ private[spark] class Client(
     def changeMaster(url: String) {
       activeMasterUrl = url
       master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+      masterAddress = activeMasterUrl match {
+        case Master.sparkUrlRegex(host, port) => Address("akka.tcp", Master.systemName,
host, port.toInt)
+        case x => throw new SparkException("Invalid spark URL:"+x)
+      }
     }
 
     override def receive = {
       case RegisteredApplication(appId_, masterUrl) =>
-        prevMaster = sender
         appId = appId_
         registered = true
         changeMaster(masterUrl)
@@ -137,7 +140,7 @@ private[spark] class Client(
         alreadyDisconnected = false
         sender ! MasterChangeAcknowledged(appId)
 
-      case DisassociatedEvent(_, address, _) => 
+      case DisassociatedEvent(_, address, _) if address == masterAddress =>
         logWarning(s"Connection to $address failed; waiting for master to reconnect...")
         markDisconnected()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/560e44a8/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 81fb5c4..0e2b461 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -517,9 +517,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends
Act
 }
 
 private[spark] object Master {
-  private val systemName = "sparkMaster"
+  val systemName = "sparkMaster"
   private val actorName = "Master"
-  private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+  val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
 
   def main(argStrings: Array[String]) {
     val args = new MasterArguments(argStrings)


Mime
View raw message