spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: Fix Scala Style
Date Thu, 24 Apr 2014 22:07:38 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 5ca01f681 -> 2250c7acb


Fix Scala Style

Any comments are welcome

Author: Sandeep <sandeep@techaddict.me>

Closes #531 from techaddict/stylefix-1 and squashes the following commits:

7492730 [Sandeep] Pass 4
98b2428 [Sandeep] fix rxin suggestions
b5e2e6f [Sandeep] Pass 3
05932d7 [Sandeep] fix if else styling 2
08690e5 [Sandeep] fix if else styling

(cherry picked from commit a03ac222d84025a1036750e1179136a13f75dea7)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2250c7ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2250c7ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2250c7ac

Branch: refs/heads/branch-1.0
Commit: 2250c7acbd857efeda99d9e850cb0faff34e19f0
Parents: 5ca01f6
Author: Sandeep <sandeep@techaddict.me>
Authored: Thu Apr 24 15:07:23 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Thu Apr 24 15:07:32 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |  7 +-
 .../spark/deploy/SparkSubmitArguments.scala     |  3 +-
 .../org/apache/spark/deploy/master/Master.scala |  3 +-
 .../spark/deploy/worker/DriverRunner.scala      |  8 ++-
 .../apache/spark/deploy/worker/ui/LogPage.scala | 16 +++--
 .../org/apache/spark/storage/BlockManager.scala |  8 ++-
 .../spark/util/BoundedPriorityQueue.scala       | 12 ++--
 .../org/apache/spark/util/FileLogger.scala      |  4 +-
 .../scala/org/apache/spark/util/Utils.scala     |  3 +-
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |  4 +-
 .../org/apache/spark/examples/LogQuery.scala    |  3 +-
 .../examples/clickstream/PageViewStream.scala   |  7 +-
 .../org/apache/spark/graphx/GraphOpsSuite.scala |  7 +-
 .../org/apache/spark/repl/SparkExprTyper.scala  | 13 ++--
 .../sql/parquet/ParquetTableOperations.scala    |  5 +-
 .../spark/sql/parquet/ParquetTableSupport.scala |  7 +-
 .../streaming/scheduler/ReceiverTracker.scala   |  3 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  9 +--
 .../spark/deploy/yarn/ExecutorLauncher.scala    |  3 +-
 .../deploy/yarn/YarnAllocationHandler.scala     | 67 ++++++++++----------
 20 files changed, 109 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index d5f3e3f..6d652fa 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -104,8 +104,11 @@ class Accumulable[R, T] (
    * Set the accumulator's value; only allowed on master.
    */
   def value_= (newValue: R) {
-    if (!deserialized) value_ = newValue
-    else throw new UnsupportedOperationException("Can't assign accumulator value in task")
+    if (!deserialized) {
+      value_ = newValue
+    } else {
+      throw new UnsupportedOperationException("Can't assign accumulator value in task")
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index cc97656..c3e8c6b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -66,8 +66,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
         if (k.startsWith("spark")) {
           defaultProperties(k) = v
           if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
-        }
-        else {
+        } else {
           SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 81f990b..fdb633b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -237,8 +237,7 @@ private[spark] class Master(
             if (waitingDrivers.contains(d)) {
               waitingDrivers -= d
               self ! DriverStateChanged(driverId, DriverState.KILLED, None)
-            }
-            else {
+            } else {
               // We just notify the worker to kill the driver here. The final bookkeeping
occurs
               // on the return path when the worker submits a state change back to the master
               // to notify it that the driver was successfully killed.

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index f918b42..662d378 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -91,9 +91,11 @@ private[spark] class DriverRunner(
         }
 
         val state =
-          if (killed) { DriverState.KILLED }
-          else if (finalException.isDefined) { DriverState.ERROR }
-          else {
+          if (killed) {
+            DriverState.KILLED
+          } else if (finalException.isDefined) {
+            DriverState.ERROR
+          } else {
             finalExitCode match {
               case Some(0) => DriverState.FINISHED
               case _ => DriverState.FAILED

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index fec1207..8381f59 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -89,8 +89,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage")
{
             Previous {Utils.bytesToString(math.min(byteLength, startByte))}
           </button>
         </a>
-      }
-      else {
+      } else {
         <button type="button" class="btn btn-default" disabled="disabled">
           Previous 0 B
         </button>
@@ -104,8 +103,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage")
{
             Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
           </button>
         </a>
-      }
-      else {
+      } else {
         <button type="button" class="btn btn-default" disabled="disabled">
           Next 0 B
         </button>
@@ -137,9 +135,13 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage")
{
     val logLength = file.length()
     val getOffset = offset.getOrElse(logLength - defaultBytes)
     val startByte =
-      if (getOffset < 0) 0L
-      else if (getOffset > logLength) logLength
-      else getOffset
+      if (getOffset < 0) {
+        0L
+      } else if (getOffset > logLength) {
+        logLength
+      } else {
+        getOffset
+      }
     val logPageLength = math.min(byteLength, maxBytes)
     val endByte = math.min(startByte + logPageLength, logLength)
     (startByte, endByte)

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ccd5c53..02ba5ec 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -281,7 +281,9 @@ private[spark] class BlockManager(
       val onDiskSize = status.diskSize
       master.updateBlockInfo(
         blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
-    } else true
+    } else {
+      true
+    }
   }
 
   /**
@@ -676,7 +678,7 @@ private[spark] class BlockManager(
               tachyonStore.putValues(blockId, iterator, level, false)
             case ArrayBufferValues(array) =>
               tachyonStore.putValues(blockId, array, level, false)
-            case ByteBufferValues(bytes) => 
+            case ByteBufferValues(bytes) =>
               bytes.rewind()
               tachyonStore.putBytes(blockId, bytes, level)
           }
@@ -695,7 +697,7 @@ private[spark] class BlockManager(
               diskStore.putValues(blockId, iterator, level, askForBytes)
             case ArrayBufferValues(array) =>
               diskStore.putValues(blockId, array, level, askForBytes)
-            case ByteBufferValues(bytes) => 
+            case ByteBufferValues(bytes) =>
               bytes.rewind()
               diskStore.putBytes(blockId, bytes, level)
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
index b9f4a5d..1b2b193 100644
--- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
+++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -43,8 +43,11 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord:
Orderin
   }
 
   override def +=(elem: A): this.type = {
-    if (size < maxSize) underlying.offer(elem)
-    else maybeReplaceLowest(elem)
+    if (size < maxSize) {
+      underlying.offer(elem)
+    } else {
+      maybeReplaceLowest(elem)
+    }
     this
   }
 
@@ -59,7 +62,8 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord:
Orderin
     if (head != null && ord.gt(a, head)) {
       underlying.poll()
       underlying.offer(a)
-    } else false
+    } else {
+      false
+    }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 7d47b2a..1ed3b70 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -113,7 +113,9 @@ private[spark] class FileLogger(
    * @param withTime Whether to prepend message with a timestamp
    */
   def log(msg: String, withTime: Boolean = false) {
-    val writeInfo = if (!withTime) msg else {
+    val writeInfo = if (!withTime) {
+      msg
+    } else {
       val date = new Date(System.currentTimeMillis())
       dateFormat.get.format(date) + ": " + msg
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a3af4e7..d333e2a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -811,8 +811,7 @@ private[spark] object Utils extends Logging {
           } else {
             el.getMethodName
           }
-        }
-        else {
+        } else {
           firstUserLine = el.getLineNumber
           firstUserFile = el.getFileName
           firstUserClass = el.getClassName

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index d7c9034..2676558 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -381,8 +381,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
       val prng42 = new Random(42)
       val prng43 = new Random(43)
       Array(1, 2, 3, 4, 5, 6).filter{i =>
-        if (i < 4) 0 == prng42.nextInt(3)
-        else 0 == prng43.nextInt(3)}
+        if (i < 4) 0 == prng42.nextInt(3) else 0 == prng43.nextInt(3)
+      }
     }
     assert(sample.size === checkSample.size)
     for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 820e87d..f77a444 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -49,8 +49,7 @@ object LogQuery {
       System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
 
     val dataSet =
-      if (args.length == 2) sc.textFile(args(1))
-      else sc.parallelize(exampleApacheLogs)
+      if (args.length == 2) sc.textFile(args(1)) else sc.parallelize(exampleApacheLogs)
     // scalastyle:off
     val apacheLogRegex =
       """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)"
"([^"]+)".*""".r

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index edc769c..673013f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -69,8 +69,11 @@ object PageViewStream {
         val normalCount = statuses.filter(_ == 200).size
         val errorCount = statuses.size - normalCount
         val errorRatio = errorCount.toFloat / statuses.size
-        if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
-        else {"%s: %s".format(zip, errorRatio)}
+        if (errorRatio > 0.05) {
+          "%s: **%s**".format(zip, errorRatio)
+        } else {
+          "%s: %s".format(zip, errorRatio)
+        }
     }
 
     // Return the number unique users in last 15 seconds

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index a467ca1..ea94d4a 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -165,8 +165,11 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
       // not have any edges in the specified direction.
       assert(edges.count === 50)
       edges.collect.foreach {
-        case (vid, edges) => if (vid > 0 && vid < 49) assert(edges.size
== 2)
-        else assert(edges.size == 1)
+        case (vid, edges) => if (vid > 0 && vid < 49) {
+          assert(edges.size == 2)
+        } else {
+          assert(edges.size == 1)
+        }
       }
       edges.collect.foreach {
         case (vid, edges) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
index dcc1395..f8432c8 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
@@ -47,9 +47,13 @@ trait SparkExprTyper extends Logging {
     var isIncomplete = false
     reporter.withIncompleteHandler((_, _) => isIncomplete = true) {
       val trees = codeParser.stmts(line)
-      if (reporter.hasErrors) Some(Nil)
-      else if (isIncomplete) None
-      else Some(trees)
+      if (reporter.hasErrors) {
+        Some(Nil)
+      } else if (isIncomplete) {
+        None
+      } else {
+        Some(trees)
+      }
     }
   }
   // def parsesAsExpr(line: String) = {
@@ -70,8 +74,7 @@ trait SparkExprTyper extends Logging {
           val sym0 = symbolOfTerm(name)
           // drop NullaryMethodType
           val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType)
-          if (sym.info.typeSymbol eq UnitClass) NoSymbol
-          else sym
+          if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym
         case _          => NoSymbol
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index d5846ba..f825ca3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -203,8 +203,9 @@ case class InsertIntoParquetTable(
     val stageId = sc.newRddId()
 
     val taskIdOffset =
-      if (overwrite) 1
-      else {
+      if (overwrite) {
+        1
+      } else {
         FileSystemHelper
           .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration)
+ 1
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 84b1b46..71ba0fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -158,8 +158,11 @@ private[parquet] class CatalystGroupConverter(
     a => a.dataType match {
       case ctype: NativeType =>
         // note: for some reason matching for StringType fails so use this ugly if instead
-        if (ctype == StringType) new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
-        else new CatalystPrimitiveConverter(this, schema.indexOf(a))
+        if (ctype == StringType) {
+          new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
+        } else {
+          new CatalystPrimitiveConverter(this, schema.indexOf(a))
+        }
       case _ => throw new RuntimeException(
         s"unable to convert datatype ${a.dataType.toString} in CatalystGroupConverter")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3d2537f..557e096 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -240,8 +240,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
         if (hasLocationPreferences) {
           val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
           ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
-        }
-        else {
+        } else {
           ssc.sc.makeRDD(receivers, receivers.size)
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2f74965..fc13dbe 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -147,12 +147,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
       .orElse(Option(System.getenv("LOCAL_DIRS")))
- 
+
     localDirs match {
       case None => throw new Exception("Yarn Local dirs can't be empty")
       case Some(l) => l
     }
-  } 
+  }
 
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
@@ -321,8 +321,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
             logInfo("Allocating %d containers to make up for (potentially) lost containers".
               format(missingExecutorCount))
             yarnAllocator.allocateContainers(missingExecutorCount)
+          } else {
+            sendProgress()
           }
-          else sendProgress()
           Thread.sleep(sleepTime)
         }
       }
@@ -361,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         return
       }
       isFinished = true
-      
+
       logInfo("finishApplicationMaster with " + status)
       if (registered) {
         val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index ea356f3..65b7215 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -243,8 +243,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration,
sp
             logInfo("Allocating " + missingExecutorCount +
               " containers to make up for (potentially ?) lost containers")
             yarnAllocator.allocateContainers(missingExecutorCount)
+          } else {
+            sendProgress()
           }
-          else sendProgress()
           Thread.sleep(sleepTime)
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2250c7ac/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 95f0f9d..856391e 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -60,12 +60,12 @@ object AllocationType extends Enumeration {
  */
 private[yarn] class YarnAllocationHandler(
     val conf: Configuration,
-    val resourceManager: AMRMProtocol, 
+    val resourceManager: AMRMProtocol,
     val appAttemptId: ApplicationAttemptId,
     val maxExecutors: Int,
     val executorMemory: Int,
     val executorCores: Int,
-    val preferredHostToCount: Map[String, Int], 
+    val preferredHostToCount: Map[String, Int],
     val preferredRackToCount: Map[String, Int],
     val sparkConf: SparkConf)
   extends Logging {
@@ -136,9 +136,10 @@ private[yarn] class YarnAllocationHandler(
           val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
 
           containers += container
+        } else {
+          // Add all ignored containers to released list
+          releasedContainerList.add(container.getId())
         }
-        // Add all ignored containers to released list
-        else releasedContainerList.add(container.getId())
       }
 
       // Find the appropriate containers to use. Slightly non trivial groupBy ...
@@ -159,8 +160,7 @@ private[yarn] class YarnAllocationHandler(
           dataLocalContainers.put(candidateHost, remainingContainers)
           // all consumed
           remainingContainers = null
-        }
-        else if (requiredHostCount > 0) {
+        } else if (requiredHostCount > 0) {
           // Container list has more containers than we need for data locality.
           // Split into two : data local container count of (remainingContainers.size -
           // requiredHostCount) and rest as remainingContainer
@@ -170,7 +170,7 @@ private[yarn] class YarnAllocationHandler(
           // remainingContainers = remaining
 
           // yarn has nasty habit of allocating a tonne of containers on a host - discourage
this :
-          // add remaining to release list. If we have insufficient containers, next allocation

+          // add remaining to release list. If we have insufficient containers, next allocation
           // cycle will reallocate (but wont treat it as data local)
           for (container <- remaining) releasedContainerList.add(container.getId())
           remainingContainers = null
@@ -182,7 +182,7 @@ private[yarn] class YarnAllocationHandler(
 
           if (rack != null){
             val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
-            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack)
- 
+            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack)
-
               rackLocalContainers.get(rack).getOrElse(List()).size
 
 
@@ -191,8 +191,7 @@ private[yarn] class YarnAllocationHandler(
               dataLocalContainers.put(rack, remainingContainers)
               // All consumed
               remainingContainers = null
-            }
-            else if (requiredRackCount > 0) {
+            } else if (requiredRackCount > 0) {
               // container list has more containers than we need for data locality.
               // Split into two : data local container count of (remainingContainers.size
-
               // requiredRackCount) and rest as remainingContainer
@@ -213,7 +212,7 @@ private[yarn] class YarnAllocationHandler(
         }
       }
 
-      // Now that we have split the containers into various groups, go through them in order
: 
+      // Now that we have split the containers into various groups, go through them in order
:
       // first host local, then rack local and then off rack (everything else).
       // Note that the list we create below tries to ensure that not all containers end up
within a
       // host if there are sufficiently large number of hosts/containers.
@@ -238,8 +237,7 @@ private[yarn] class YarnAllocationHandler(
           releasedContainerList.add(containerId)
           // reset counter back to old value.
           numExecutorsRunning.decrementAndGet()
-        }
-        else {
+        } else {
           // Deallocate + allocate can result in reusing id's wrongly - so use a different
counter
           // (executorIdCounter)
           val executorId = executorIdCounter.incrementAndGet().toString
@@ -293,8 +291,7 @@ private[yarn] class YarnAllocationHandler(
         // Was this released by us ? If yes, then simply remove from containerSet and move
on.
         if (pendingReleaseContainers.containsKey(containerId)) {
           pendingReleaseContainers.remove(containerId)
-        }
-        else {
+        } else {
           // Simply decrement count - next iteration of ReporterThread will take care of
allocating.
           numExecutorsRunning.decrementAndGet()
           logInfo("Completed container %s (state: %s, exit status: %s)".format(
@@ -319,8 +316,11 @@ private[yarn] class YarnAllocationHandler(
             assert (containerSet != null)
 
             containerSet -= containerId
-            if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
-            else allocatedHostToContainersMap.update(host, containerSet)
+            if (containerSet.isEmpty) {
+              allocatedHostToContainersMap.remove(host)
+            } else {
+              allocatedHostToContainersMap.update(host, containerSet)
+            }
 
             allocatedContainerToHostMap -= containerId
 
@@ -328,8 +328,11 @@ private[yarn] class YarnAllocationHandler(
             val rack = YarnAllocationHandler.lookupRack(conf, host)
             if (rack != null) {
               val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
-              if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
-              else allocatedRackCount.remove(rack)
+              if (rackCount > 0) {
+                allocatedRackCount.put(rack, rackCount)
+              } else {
+                allocatedRackCount.remove(rack)
+              }
             }
           }
         }
@@ -365,10 +368,10 @@ private[yarn] class YarnAllocationHandler(
       }
     }
 
-    val requestedContainers: ArrayBuffer[ResourceRequest] = 
+    val requestedContainers: ArrayBuffer[ResourceRequest] =
       new ArrayBuffer[ResourceRequest](rackToCounts.size)
     for ((rack, count) <- rackToCounts){
-      requestedContainers += 
+      requestedContainers +=
         createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
     }
 
@@ -401,11 +404,10 @@ private[yarn] class YarnAllocationHandler(
         preferredHostToCount.isEmpty)
       resourceRequests = List(createResourceRequest(
         AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
-    }
-    else {
-      // request for all hosts in preferred nodes and for numExecutors - 
+    } else {
+      // request for all hosts in preferred nodes and for numExecutors -
       // candidates.size, request by default allocation policy.
-      val hostContainerRequests: ArrayBuffer[ResourceRequest] = 
+      val hostContainerRequests: ArrayBuffer[ResourceRequest] =
         new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
       for ((candidateHost, candidateCount) <- preferredHostToCount) {
         val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
@@ -449,8 +451,7 @@ private[yarn] class YarnAllocationHandler(
     if (numExecutors > 0) {
       logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
         executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
-    }
-    else {
+    } else {
       logDebug("Empty allocation req ..  release : " + releasedContainerList)
     }
 
@@ -467,7 +468,7 @@ private[yarn] class YarnAllocationHandler(
 
 
   private def createResourceRequest(
-    requestType: AllocationType.AllocationType, 
+    requestType: AllocationType.AllocationType,
     resource:String,
     numExecutors: Int,
     priority: Int): ResourceRequest = {
@@ -528,7 +529,7 @@ private[yarn] class YarnAllocationHandler(
     if (! retval.isEmpty) {
       releasedContainerList.removeAll(retval)
       for (v <- retval) pendingReleaseContainers.put(v, true)
-      logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " + 
+      logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
         pendingReleaseContainers)
     }
 
@@ -539,7 +540,7 @@ private[yarn] class YarnAllocationHandler(
 object YarnAllocationHandler {
 
   val ANY_HOST = "*"
-  // All requests are issued with same priority : we do not (yet) have any distinction between

+  // All requests are issued with same priority : we do not (yet) have any distinction between
   // request types (like map/reduce in hadoop for example)
   val PRIORITY = 1
 
@@ -548,7 +549,7 @@ object YarnAllocationHandler {
 
   // Host to rack map - saved from allocation requests
   // We are expecting this not to change.
-  // Note that it is possible for this to change : and RM will indicate that to us via update

+  // Note that it is possible for this to change : and RM will indicate that to us via update
   // response to allocate. But we are punting on handling that for now.
   private val hostToRack = new ConcurrentHashMap[String, String]()
   private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
@@ -565,7 +566,7 @@ object YarnAllocationHandler {
       conf,
       resourceManager,
       appAttemptId,
-      args.numExecutors, 
+      args.numExecutors,
       args.executorMemory,
       args.executorCores,
       Map[String, Int](),
@@ -587,7 +588,7 @@ object YarnAllocationHandler {
       conf,
       resourceManager,
       appAttemptId,
-      args.numExecutors, 
+      args.numExecutors,
       args.executorMemory,
       args.executorCores,
       hostToCount,


Mime
View raw message