Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C70A3109A2 for ; Sat, 26 Oct 2013 01:29:15 +0000 (UTC) Received: (qmail 45939 invoked by uid 500); 26 Oct 2013 01:29:15 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 45901 invoked by uid 500); 26 Oct 2013 01:29:15 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 45894 invoked by uid 99); 26 Oct 2013 01:29:15 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Oct 2013 01:29:15 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD,WEIRD_QUOTING X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sat, 26 Oct 2013 01:29:12 +0000 Received: (qmail 45812 invoked by uid 99); 26 Oct 2013 01:28:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Oct 2013 01:28:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5B2D081EA80; Sat, 26 Oct 2013 01:28:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: matei@apache.org To: commits@spark.incubator.apache.org Date: Sat, 26 Oct 2013 01:28:49 -0000 Message-Id: <6c22800e795a4a2ab1aef979237bc8e3@git.apache.org> In-Reply-To: <6c82182e08e545939086fb0dc308cac7@git.apache.org> References: <6c82182e08e545939086fb0dc308cac7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: fixing comments on PR X-Virus-Checked: Checked by ClamAV on apache.org fixing comments on PR Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/eef261c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/eef261c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/eef261c8 Branch: refs/heads/master Commit: eef261c89286ddbcdcc03684c1a5d0b94d6da321 Parents: 05a0df2 Author: Ali Ghodsi Authored: Thu Oct 24 12:12:11 2013 -0700 Committer: Ali Ghodsi Committed: Fri Oct 25 16:48:33 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 16 ++++--------- .../cluster/CoarseGrainedSchedulerBackend.scala | 6 ++--- .../cluster/SimrSchedulerBackend.scala | 25 +++++++++----------- 3 files changed, 18 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/eef261c8/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3b39c97..c9bc01c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -57,20 +57,13 @@ import org.apache.spark.rdd._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend} -import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalScheduler +import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} -import org.apache.spark.scheduler.StageInfo -import org.apache.spark.storage.RDDInfo -import org.apache.spark.storage.StorageStatus -import scala.Some -import org.apache.spark.scheduler.StageInfo -import org.apache.spark.storage.RDDInfo -import org.apache.spark.storage.StorageStatus /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -133,7 +126,7 @@ class SparkContext( val startTime = System.currentTimeMillis() // Add each JAR given through the constructor - if (jars != null && jars != Seq(null)) { + if (jars != null) { jars.foreach { addJar(_) } } @@ -164,7 +157,7 @@ class SparkContext( val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster val MESOS_REGEX = """mesos://(.*)""".r - //Regular expression for connection to Simr cluster + // Regular expression for connection to Simr cluster val SIMR_REGEX = """simr://(.*)""".r master match { @@ -694,8 +687,7 @@ class SparkContext( */ def addJar(path: String) { if (path == null) { - logWarning("null specified as parameter to addJar", - new SparkException("null specified as parameter to addJar")) + logWarning("null specified as parameter to addJar") } else { var key = "" if (path.contains("\\")) { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/eef261c8/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 80a9b46..70f3f88 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -182,7 +182,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac if (driverActor != null) { logInfo("Shutting down all executors") val future = driverActor.ask(StopExecutors)(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } } catch { case e: Exception => @@ -194,7 +194,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac try { if (driverActor != null) { val future = driverActor.ask(StopDriver)(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } } catch { case e: Exception => @@ -217,7 +217,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac def removeExecutor(executorId: String, reason: String) { try { val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } catch { case e: Exception => throw new SparkException("Error notifying standalone scheduler's driver actor", e) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/eef261c8/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index ae56244..d78bdba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -1,5 +1,3 @@ -package org.apache.spark.scheduler.cluster - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,20 +15,21 @@ package org.apache.spark.scheduler.cluster * limitations under the License. */ +package org.apache.spark.scheduler.cluster -import org.apache.spark.{Logging, SparkContext} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.{Logging, SparkContext} private[spark] class SimrSchedulerBackend( - scheduler: ClusterScheduler, - sc: SparkContext, - driverFilePath: String) + scheduler: ClusterScheduler, + sc: SparkContext, + driverFilePath: String) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { - val tmpPath = new Path(driverFilePath + "_tmp"); - val filePath = new Path(driverFilePath); + val tmpPath = new Path(driverFilePath + "_tmp") + val filePath = new Path(driverFilePath) val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt @@ -44,8 +43,8 @@ private[spark] class SimrSchedulerBackend( val conf = new Configuration() val fs = FileSystem.get(conf) - logInfo("Writing to HDFS file: " + driverFilePath); - logInfo("Writing AKKA address: " + driverUrl); + logInfo("Writing to HDFS file: " + driverFilePath) + logInfo("Writing Akka address: " + driverUrl) // Create temporary file to prevent race condition where executors get empty driverUrl file val temp = fs.create(tmpPath, true) @@ -54,16 +53,14 @@ private[spark] class SimrSchedulerBackend( temp.close() // "Atomic" rename - fs.rename(tmpPath, filePath); + fs.rename(tmpPath, filePath) } override def stop() { val conf = new Configuration() val fs = FileSystem.get(conf) - fs.delete(new Path(driverFilePath), false); + fs.delete(new Path(driverFilePath), false) super.stopExecutors() super.stop() } } - -