spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [07/13] git commit: Change url format to spark://host1:port1, host2:port2
Date Fri, 11 Oct 2013 00:17:14 GMT
Change url format to spark://host1:port1,host2:port2

This replaces the format of spark://host1:port1,spark://host2:port2 and is more
consistent with ZooKeeper's zk:// urls.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/718e8c20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/718e8c20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/718e8c20

Branch: refs/heads/master
Commit: 718e8c20526847657a58ab7ea5e4c86c367ae6d9
Parents: e119022
Author: Aaron Davidson <aaron@databricks.com>
Authored: Sun Oct 6 00:02:08 2013 -0700
Committer: Aaron Davidson <aaron@databricks.com>
Committed: Sun Oct 6 00:02:08 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkContext.scala   |  4 ++--
 .../org/apache/spark/deploy/FaultToleranceTest.scala      | 10 ++++++----
 .../scala/org/apache/spark/deploy/client/Client.scala     |  2 ++
 .../scala/org/apache/spark/deploy/worker/Worker.scala     |  3 +++
 .../org/apache/spark/deploy/worker/WorkerArguments.scala  |  2 +-
 5 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/718e8c20/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5318847..b264387 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -153,7 +153,7 @@ class SparkContext(
     // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
     val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
     // Regular expression for connecting to Spark deploy clusters
-    val SPARK_REGEX = """(spark://.*)""".r
+    val SPARK_REGEX = """spark://(.*)""".r
     //Regular expression for connection to Mesos cluster
     val MESOS_REGEX = """(mesos://.*)""".r
 
@@ -169,7 +169,7 @@ class SparkContext(
 
       case SPARK_REGEX(sparkUrl) =>
         val scheduler = new ClusterScheduler(this)
-        val masterUrls = sparkUrl.split(",")
+        val masterUrls = sparkUrl.split(",").map("spark://" + _)
         val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
         scheduler.initialize(backend)
         scheduler

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/718e8c20/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index f9e4018..8bac62b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -36,6 +36,8 @@ import org.apache.spark.deploy.master.RecoveryState
 
 /**
  * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
+ * Execute using
+ * ./spark-class org.apache.spark.deploy.FaultToleranceTest
  *
  * In order to mimic a real distributed cluster more closely, Docker is used.
  * Unfortunately, this dependency means that the suite cannot be run automatically without
a
@@ -56,7 +58,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
   assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")
 
   val containerSparkHome = "/opt/spark"
-  val dockerMountString = "%s:%s".format(sparkHome, containerSparkHome)
+  val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
 
   System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip
 
@@ -172,12 +174,12 @@ private[spark] object FaultToleranceTest extends App with Logging {
   }
 
   def addMasters(num: Int) {
-    (1 to num).foreach { _ => masters += SparkDocker.startMaster(sparkHome) }
+    (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
   }
 
   def addWorkers(num: Int) {
     val masterUrls = getMasterUrls(masters)
-    (1 to num).foreach { _ => workers += SparkDocker.startWorker(sparkHome, masterUrls)
}
+    (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls)
}
   }
 
   /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
@@ -190,7 +192,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
   }
 
   def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
-    masters.map(master => "spark://" + master.ip + ":7077").mkString(",")
+    "spark://" + masters.map(master => master.ip + ":7077").mkString(",")
   }
 
   def getLeader: TestMasterInfo = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/718e8c20/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 198d5ce..0d4682f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -38,6 +38,8 @@ import org.apache.spark.deploy.master.Master
 /**
  * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
  * and a listener for cluster events, and calls back the listener when various events occur.
+ *
+ * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class Client(
     actorSystem: ActorSystem,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/718e8c20/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 25ba756..216d9d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -35,6 +35,9 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{Utils, AkkaUtils}
 
+/**
+  * @param masterUrls Each url should look like spark://host:port.
+  */
 private[spark] class Worker(
     host: String,
     port: Int,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/718e8c20/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 16d8686..3ed528e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -89,7 +89,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
       if (masters != null) {  // Two positional arguments were given
         printUsageAndExit(1)
       }
-      masters = value.split(",")
+      masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
       parse(tail)
 
     case Nil =>


Mime
View raw message