spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [2/3] git commit: fixing comments on PR
Date Sat, 26 Oct 2013 01:28:49 GMT
fixing comments on PR


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/eef261c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/eef261c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/eef261c8

Branch: refs/heads/master
Commit: eef261c89286ddbcdcc03684c1a5d0b94d6da321
Parents: 05a0df2
Author: Ali Ghodsi <alig@cs.berkeley.edu>
Authored: Thu Oct 24 12:12:11 2013 -0700
Committer: Ali Ghodsi <alig@cs.berkeley.edu>
Committed: Fri Oct 25 16:48:33 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 16 ++++---------
 .../cluster/CoarseGrainedSchedulerBackend.scala |  6 ++---
 .../cluster/SimrSchedulerBackend.scala          | 25 +++++++++-----------
 3 files changed, 18 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/eef261c8/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3b39c97..c9bc01c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -57,20 +57,13 @@ import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
   SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
-import org.apache.spark.scheduler.local.LocalScheduler
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalScheduler
+import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
   TimeStampedHashMap, Utils}
-import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.storage.StorageStatus
-import scala.Some
-import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.storage.StorageStatus
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to
a Spark
@@ -133,7 +126,7 @@ class SparkContext(
   val startTime = System.currentTimeMillis()
 
   // Add each JAR given through the constructor
-  if (jars != null && jars != Seq(null)) {
+  if (jars != null) {
     jars.foreach { addJar(_) }
   }
 
@@ -164,7 +157,7 @@ class SparkContext(
     val SPARK_REGEX = """spark://(.*)""".r
     // Regular expression for connection to Mesos cluster
     val MESOS_REGEX = """mesos://(.*)""".r
-    //Regular expression for connection to Simr cluster
+    // Regular expression for connection to Simr cluster
     val SIMR_REGEX = """simr://(.*)""".r
 
     master match {
@@ -694,8 +687,7 @@ class SparkContext(
    */
   def addJar(path: String) {
     if (path == null) {
-      logWarning("null specified as parameter to addJar",
-        new SparkException("null specified as parameter to addJar"))
+      logWarning("null specified as parameter to addJar")
     } else {
       var key = ""
       if (path.contains("\\")) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/eef261c8/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 80a9b46..70f3f88 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -182,7 +182,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem:
Ac
       if (driverActor != null) {
         logInfo("Shutting down all executors")
         val future = driverActor.ask(StopExecutors)(timeout)
-        Await.result(future, timeout)
+        Await.ready(future, timeout)
       }
     } catch {
       case e: Exception =>
@@ -194,7 +194,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem:
Ac
     try {
       if (driverActor != null) {
         val future = driverActor.ask(StopDriver)(timeout)
-        Await.result(future, timeout)
+        Await.ready(future, timeout)
       }
     } catch {
       case e: Exception =>
@@ -217,7 +217,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem:
Ac
   def removeExecutor(executorId: String, reason: String) {
     try {
       val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
-      Await.result(future, timeout)
+      Await.ready(future, timeout)
     } catch {
       case e: Exception =>
         throw new SparkException("Error notifying standalone scheduler's driver actor", e)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/eef261c8/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index ae56244..d78bdba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -1,5 +1,3 @@
-package org.apache.spark.scheduler.cluster
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,20 +15,21 @@ package org.apache.spark.scheduler.cluster
  * limitations under the License.
  */
 
+package org.apache.spark.scheduler.cluster
 
-import org.apache.spark.{Logging, SparkContext}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.spark.{Logging, SparkContext}
 
 private[spark] class SimrSchedulerBackend(
-                                           scheduler: ClusterScheduler,
-                                           sc: SparkContext,
-                                           driverFilePath: String)
+    scheduler: ClusterScheduler,
+    sc: SparkContext,
+    driverFilePath: String)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with Logging {
 
-  val tmpPath = new Path(driverFilePath + "_tmp");
-  val filePath = new Path(driverFilePath);
+  val tmpPath = new Path(driverFilePath + "_tmp")
+  val filePath = new Path(driverFilePath)
 
   val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
 
@@ -44,8 +43,8 @@ private[spark] class SimrSchedulerBackend(
     val conf = new Configuration()
     val fs = FileSystem.get(conf)
 
-    logInfo("Writing to HDFS file: "  + driverFilePath);
-    logInfo("Writing AKKA address: "  + driverUrl);
+    logInfo("Writing to HDFS file: "  + driverFilePath)
+    logInfo("Writing Akka address: "  + driverUrl)
 
     // Create temporary file to prevent race condition where executors get empty driverUrl
file
     val temp = fs.create(tmpPath, true)
@@ -54,16 +53,14 @@ private[spark] class SimrSchedulerBackend(
     temp.close()
 
     // "Atomic" rename
-    fs.rename(tmpPath, filePath);
+    fs.rename(tmpPath, filePath)
   }
 
   override def stop() {
     val conf = new Configuration()
     val fs = FileSystem.get(conf)
-    fs.delete(new Path(driverFilePath), false);
+    fs.delete(new Path(driverFilePath), false)
     super.stopExecutors()
     super.stop()
   }
 }
-
-


Mime
View raw message