spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [1/3] Merge pull request #557 from ScrapCodes/style. Closes #557.
Date Sun, 09 Feb 2014 18:09:29 GMT
Updated Branches:
  refs/heads/master b6dba10ae -> b69f8b2a0


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index b47d786..c989ec0 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -59,10 +59,10 @@ object ZeroMQUtils {
    * @param jssc           JavaStreamingContext object
    * @param publisherUrl   Url of remote ZeroMQ publisher
    * @param subscribe      Topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+   *                       frame has sequence of byte thus it needs the converter(which might be
+   *                       deserializer of bytes) to translate from sequence of sequence of bytes,
+   *                       where sequence refer to a frame and sub sequence refer to its payload.
    * @param storageLevel  Storage level to use for storing the received objects
    */
   def createStream[T](
@@ -84,10 +84,10 @@ object ZeroMQUtils {
    * @param jssc           JavaStreamingContext object
    * @param publisherUrl   Url of remote zeromq publisher
    * @param subscribe      Topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+   *                       frame has sequence of byte thus it needs the converter(which might be
+   *                       deserializer of bytes) to translate from sequence of sequence of bytes,
+   *                       where sequence refer to a frame and sub sequence refer to its payload.
    * @param storageLevel   RDD storage level.
    */
   def createStream[T](
@@ -108,10 +108,11 @@ object ZeroMQUtils {
    * @param jssc           JavaStreamingContext object
    * @param publisherUrl   Url of remote zeromq publisher
    * @param subscribe      Topic to subscribe to
-   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
-   *                       of byte thus it needs the converter(which might be deserializer of bytes)
-   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
-   *                       and sub sequence refer to its payload.
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+   *                       frame has sequence of byte thus it needs the converter(which might
+   *                       be deserializer of bytes) to translate from sequence of sequence of
+   *                       bytes, where sequence refer to a frame and sub sequence refer to its
+   *                       payload.
    */
   def createStream[T](
       jssc: JavaStreamingContext,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fe03ae4..799a9dd 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -66,7 +66,8 @@ class EdgeRDD[@specialized ED: ClassTag](
     this
   }
 
-  private[graphx] def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
+  private[graphx] def mapEdgePartitions[ED2: ClassTag](
+    f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
     : EdgeRDD[ED2] = {
     new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
       val (pid, ep) = iter.next()
@@ -97,8 +98,8 @@ class EdgeRDD[@specialized ED: ClassTag](
    *
    * @param other the EdgeRDD to join with
    * @param f the join function applied to corresponding values of `this` and `other`
-   * @return a new EdgeRDD containing only edges that appear in both `this` and `other`, with values
-   * supplied by `f`
+   * @return a new EdgeRDD containing only edges that appear in both `this` and `other`,
+   *         with values supplied by `f`
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
       (other: EdgeRDD[ED2])

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index eea95d3..65a1a8c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -171,8 +171,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
     : Graph[VD, ED2]
 
   /**
-   * Transforms each edge attribute using the map function, passing it the adjacent vertex attributes
-   * as well. If adjacent vertex values are not required, consider using `mapEdges` instead.
+   * Transforms each edge attribute using the map function, passing it the adjacent vertex
+   * attributes as well. If adjacent vertex values are not required,
+   * consider using `mapEdges` instead.
    *
    * @note This does not change the structure of the
    * graph or modify the values of this graph.  As a consequence
@@ -280,13 +281,13 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * be commutative and associative and is used to combine the output
    * of the map phase
    *
-   * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to consider
-   * when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on edges with
-   * destination in the active set.  If the direction is `Out`, `mapFunc` will only be run on edges
-   * originating from vertices in the active set. If the direction is `Either`, `mapFunc` will be
-   * run on edges with *either* vertex in the active set. If the direction is `Both`, `mapFunc` will
-   * be run on edges with *both* vertices in the active set. The active set must have the same index
-   * as the graph's vertices.
+   * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to
+   * consider when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on
+   * edges with destination in the active set.  If the direction is `Out`,
+   * `mapFunc` will only be run on edges originating from vertices in the active set. If the
+   * direction is `Either`, `mapFunc` will be run on edges with *either* vertex in the active set
+   * . If the direction is `Both`, `mapFunc` will be run on edges with *both* vertices in the
+   * active set. The active set must have the same index as the graph's vertices.
    *
    * @example We can use this function to compute the in-degree of each
    * vertex

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 9299153..0470d74 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -57,8 +57,9 @@ object PartitionStrategy {
    * </pre>
    *
    * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
-   * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks.  Notice
-   * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
+   * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
+   * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3,
+   * P6)` or the last
    * row of blocks `(P6, P7, P8)`.  As a consequence we can guarantee that `v11` will need to be
    * replicated to at most `2 * sqrt(numParts)` machines.
    *
@@ -66,11 +67,12 @@ object PartitionStrategy {
    * balance.  To improve balance we first multiply each vertex id by a large prime to shuffle the
    * vertex locations.
    *
-   * One of the limitations of this approach is that the number of machines must either be a perfect
-   * square. We partially address this limitation by computing the machine assignment to the next
+   * One of the limitations of this approach is that the number of machines must either be a
+   * perfect square. We partially address this limitation by computing the machine assignment to
+   * the next
    * largest perfect square and then mapping back down to the actual number of machines.
-   * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
-   * is used.
+   * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect
+   * square is used.
    */
   case object EdgePartition2D extends PartitionStrategy {
     override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index edd59bc..d6788d4 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -59,7 +59,8 @@ class VertexRDD[@specialized VD: ClassTag](
 
   /**
    * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
-   * VertexRDD will be based on a different index and can no longer be quickly joined with this RDD.
+   * VertexRDD will be based on a different index and can no longer be quickly joined with this
+   * RDD.
    */
   def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
 
@@ -101,7 +102,8 @@ class VertexRDD[@specialized VD: ClassTag](
   /**
    * Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
    */
-  private[graphx] def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2])
+  private[graphx] def mapVertexPartitions[VD2: ClassTag](
+    f: VertexPartition[VD] => VertexPartition[VD2])
     : VertexRDD[VD2] = {
     val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
     new VertexRDD(newPartitionsRDD)
@@ -159,8 +161,9 @@ class VertexRDD[@specialized VD: ClassTag](
   }
 
   /**
-   * Left joins this RDD with another VertexRDD with the same index. This function will fail if both
-   * VertexRDDs do not share the same index. The resulting vertex set contains an entry for each
+   * Left joins this RDD with another VertexRDD with the same index. This function will fail if
+   * both VertexRDDs do not share the same index. The resulting vertex set contains an entry for
+   * each
    * vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
    *
    * @tparam VD2 the attribute type of the other VertexRDD
@@ -187,8 +190,8 @@ class VertexRDD[@specialized VD: ClassTag](
    * Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
    * backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is
    * used. The resulting VertexRDD contains an entry for each vertex in `this`. If `other` is
-   * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex
-   * is picked arbitrarily.
+   * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates,
+   * the vertex is picked arbitrarily.
    *
    * @tparam VD2 the attribute type of the other VertexRDD
    * @tparam VD3 the attribute type of the resulting VertexRDD
@@ -238,14 +241,14 @@ class VertexRDD[@specialized VD: ClassTag](
 
   /**
    * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
-   * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is
-   * used.
+   * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation
+   * is used.
    *
    * @param other an RDD containing vertices to join. If there are multiple entries for the same
    * vertex, one is picked arbitrarily. Use [[aggregateUsingIndex]] to merge multiple entries.
    * @param f the join function applied to corresponding values of `this` and `other`
-   * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this`
-   * and `other`, with values supplied by `f`
+   * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both
+   *         `this` and `other`, with values supplied by `f`
    */
   def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
       (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index f914e05..24699df 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -82,7 +82,7 @@ object Analytics extends Logging {
 
         val pr = graph.pageRank(tol).vertices.cache()
 
-        println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
+        println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
 
         if (!outFname.isEmpty) {
           logWarning("Saving pageranks of pages to " + outFname)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 7677641..f841846 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -37,11 +37,7 @@ object GraphGenerators {
 
   val RMATa = 0.45
   val RMATb = 0.15
-  val RMATc = 0.15
   val RMATd = 0.25
-
-  // Right now it just generates a bunch of edges where
-  // the edge data is the weight (default 1)
   /**
    * Generate a graph whose vertex out degree is log normal.
    */
@@ -59,15 +55,20 @@ object GraphGenerators {
     Graph(vertices, edges, 0)
   }
 
+  // Right now it just generates a bunch of edges where
+  // the edge data is the weight (default 1)
+  val RMATc = 0.15
+
   def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
     val rand = new Random()
     Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
   }
 
   /**
-   * Randomly samples from a log normal distribution whose corresponding normal distribution has the
-   * the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`, `s` are
-   * the mean, standard deviation of the lognormal distribution and `Z ~ N(0, 1)`. In this function,
+   * Randomly samples from a log normal distribution whose corresponding normal distribution has
+   * the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`,
+   * `s` are the mean, standard deviation of the lognormal distribution and
+   * `Z ~ N(0, 1)`. In this function,
    * `m = e^(mu+sigma^2/2)` and `s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))]`.
    *
    * @param mu the mean of the normal distribution
@@ -76,7 +77,7 @@ object GraphGenerators {
    */
   private def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = {
     val rand = new Random()
-    val m = math.exp(mu+(sigma*sigma)/2.0)
+    val m = math.exp(mu + (sigma * sigma) / 2.0)
     val s = math.sqrt((math.exp(sigma*sigma) - 1) * math.exp(2*mu + sigma*sigma))
     // Z ~ N(0, 1)
     var X: Double = maxVal
@@ -169,9 +170,9 @@ object GraphGenerators {
       val newT = math.round(t.toFloat/2.0).toInt
       pickQuadrant(RMATa, RMATb, RMATc, RMATd) match {
         case 0 => chooseCell(x, y, newT)
-        case 1 => chooseCell(x+newT, y, newT)
-        case 2 => chooseCell(x, y+newT, newT)
-        case 3 => chooseCell(x+newT, y+newT, newT)
+        case 1 => chooseCell(x + newT, y, newT)
+        case 2 => chooseCell(x, y + newT, newT)
+        case 3 => chooseCell(x + newT, y + newT, newT)
       }
     }
   }
@@ -179,8 +180,8 @@ object GraphGenerators {
   // TODO(crankshaw) turn result into an enum (or case class for pattern matching}
   private def pickQuadrant(a: Double, b: Double, c: Double, d: Double): Int = {
     if (a + b + c + d != 1.0) {
-      throw new IllegalArgumentException(
-        "R-MAT probability parameters sum to " + (a+b+c+d) + ", should sum to 1.0")
+      throw new IllegalArgumentException("R-MAT probability parameters sum to " + (a + b + c + d)
+        + ", should sum to 1.0")
     }
     val rand = new Random()
     val result = rand.nextDouble()
@@ -212,8 +213,8 @@ object GraphGenerators {
       sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
     val edges: RDD[Edge[Double]] =
       vertices.flatMap{ case (vid, (r,c)) =>
-        (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++
-        (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty })
+        (if (r + 1 < rows) { Seq( (sub2ind(r, c), sub2ind(r + 1, c))) } else { Seq.empty }) ++
+        (if (c + 1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c + 1))) } else { Seq.empty })
       }.map{ case (src, dst) => Edge(src, dst, 1.0) }
     Graph(vertices, edges)
   } // end of gridGraph

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index efc0eb9..efe99a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -106,7 +106,8 @@ class PythonMLLibAPI extends Serializable {
     bytes
   }
 
-  private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+  private def trainRegressionModel(
+      trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
       dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
       java.util.LinkedList[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(xBytes => {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
index e476b53..8803c4c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -105,7 +105,7 @@ object SVD {
       cols.flatMap{ case (colind1, mval1) =>
                     cols.map{ case (colind2, mval2) =>
                             ((colind1, colind2), mval1*mval2) } }
-    }.reduceByKey(_+_)
+    }.reduceByKey(_ + _)
 
     // Construct jblas A^T A locally
     val ata = DoubleMatrix.zeros(n, n)
@@ -145,10 +145,10 @@ object SVD {
     // Multiply A by VS^-1
     val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
     val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
-    val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
-        => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
+    val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)))
+        => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_ + _)
           .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}
-    val retU = SparseMatrix(retUdata, m, sigma.length) 
+    val retU = SparseMatrix(retUdata, m, sigma.length)
    
     MatrixSVD(retU, retS, retV)  
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index b77364e..cd80134 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -142,7 +142,7 @@ object GradientDescent extends Logging {
     var regVal = 0.0
 
     for (i <- 1 to numIterations) {
-      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
+      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i).map {
         case (y, features) =>
           val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
           val (grad, loss) = gradient.compute(featuresCol, y, weights)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index c5f64b1..a990e0f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -84,8 +84,9 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
  * [[http://research.yahoo.com/pub/2433]], adapted for the blocked approach used here.
  *
  * Essentially instead of finding the low-rank approximations to the rating matrix `R`,
- * this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if r > 0
- * and 0 if r = 0. The ratings then act as 'confidence' values related to strength of indicated user
+ * this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if
+ * r > 0 and 0 if r = 0. The ratings then act as 'confidence' values related to strength of
+ * indicated user
  * preferences rather than explicit ratings given to items.
  */
 class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double,
@@ -152,8 +153,8 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
     val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
     val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
 
-    // Initialize user and product factors randomly, but use a deterministic seed for each partition
-    // so that fault recovery works
+    // Initialize user and product factors randomly, but use a deterministic seed for each
+    // partition so that fault recovery works
     val seedGen = new Random()
     val seed1 = seedGen.nextInt()
     val seed2 = seedGen.nextInt()
@@ -268,7 +269,8 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
       val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray
       // Sort them by product ID
       val ordering = new Ordering[(Int, ArrayBuffer[Rating])] {
-        def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int = a._1 - b._1
+        def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int =
+            a._1 - b._1
       }
       Sorting.quickSort(groupedRatings)(ordering)
       // Translate the user IDs to indices based on userIdToPos
@@ -369,7 +371,8 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
     val tempXtX = DoubleMatrix.zeros(triangleSize)
     val fullXtX = DoubleMatrix.zeros(rank, rank)
 
-    // Compute the XtX and Xy values for each user by adding products it rated in each product block
+    // Compute the XtX and Xy values for each user by adding products it rated in each product
+    // block
     for (productBlock <- 0 until numBlocks) {
       for (p <- 0 until blockFactors(productBlock).length) {
         val x = new DoubleMatrix(blockFactors(productBlock)(p))
@@ -544,9 +547,8 @@ object ALS {
    * @param iterations number of iterations of ALS (recommended: 10-20)
    * @param lambda     regularization factor (recommended: 0.01)
    */
-  def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double)
-  : MatrixFactorizationModel =
-  {
+  def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double,
+      alpha: Double): MatrixFactorizationModel = {
     trainImplicit(ratings, rank, iterations, lambda, -1, alpha)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 11a937e..bb79f0c 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -21,6 +21,8 @@ import Keys._
 import sbtassembly.Plugin._
 import AssemblyKeys._
 import scala.util.Properties
+import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings}
+
 // For Sonatype publishing
 //import com.jsuereth.pgp.sbtplugin.PgpKeys._
 
@@ -231,7 +233,7 @@ object SparkBuild extends Build {
     publishMavenStyle in MavenCompile := true,
     publishLocal in MavenCompile <<= publishTask(publishLocalConfiguration in MavenCompile, deliverLocal),
     publishLocalBoth <<= Seq(publishLocal in MavenCompile, publishLocal).dependOn
-  ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings
+  ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings ++ ScalaStyleSettings
 
   val slf4jVersion = "1.7.2"
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/project/build.properties
----------------------------------------------------------------------
diff --git a/project/build.properties b/project/build.properties
index 839f5fb..4b52bb9 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-sbt.version=0.12.4
+sbt.version=0.13.1

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/project/plugins.sbt
----------------------------------------------------------------------
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4ba0e42..914f2e0 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,10 +1,10 @@
+scalaVersion := "2.10.3"
+
 resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
 
 resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
 
-resolvers += "Spray Repository" at "http://repo.spray.cc/"
-
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
 
 addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")
 
@@ -15,4 +15,7 @@ addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1")
 
 //addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6")
 
-addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.3")
+addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")
+
+addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0")
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/project/project/SparkPluginBuild.scala
----------------------------------------------------------------------
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
index 4853be2..0392a60 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -20,5 +20,5 @@ import sbt._
 object SparkPluginDef extends Build {
   lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener)
   /* This is not published in a Maven repository, so we get it from GitHub directly */
-  lazy val junitXmlListener = uri("https://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce")
+  lazy val junitXmlListener = uri("git://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016")
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 3e17184..e3bcf7f 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -39,20 +39,22 @@ extends ClassLoader(parent) {
 
   // Hadoop FileSystem object for our URI, if it isn't using HTTP
   var fileSystem: FileSystem = {
-    if (uri.getScheme() == "http")
+    if (uri.getScheme() == "http") {
       null
-    else
+    } else {
       FileSystem.get(uri, new Configuration())
+    }
   }
   
   override def findClass(name: String): Class[_] = {
     try {
       val pathInDirectory = name.replace('.', '/') + ".class"
       val inputStream = {
-        if (fileSystem != null)
+        if (fileSystem != null) {
           fileSystem.open(new Path(directory, pathInDirectory))
-        else
+        } else {
           new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+        }
       }
       val bytes = readAndTransformClass(name, inputStream)
       inputStream.close()
@@ -81,10 +83,11 @@ extends ClassLoader(parent) {
       var done = false
       while (!done) {
         val num = in.read(bytes)
-        if (num >= 0)
+        if (num >= 0) {
           bos.write(bytes, 0, num)
-        else
+        } else {
           done = true
+        }
       }
       return bos.toByteArray
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
index b2e1df1..dcc1395 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
 /* NSC -- new Scala compiler
  * Copyright 2005-2013 LAMP/EPFL
  * @author  Paul Phillips

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 87d94d5..bc25b50 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
 /* NSC -- new Scala compiler
  * Copyright 2005-2013 LAMP/EPFL
  * @author Alexander Spoon

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index efe4524..3ebf288 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
 /* NSC -- new Scala compiler
  * Copyright 2005-2013 LAMP/EPFL
  * @author Paul Phillips

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 59fdb0b..1d73d0b 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
 /* NSC -- new Scala compiler
  * Copyright 2005-2013 LAMP/EPFL
  * @author  Martin Odersky

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
index 6408420..8f61a5e 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
 /* NSC -- new Scala compiler
  * Copyright 2005-2013 LAMP/EPFL
  * @author  Paul Phillips

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala b/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala
index 8865f82..3159b70 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
 /* NSC -- new Scala compiler
  * Copyright 2005-2013 LAMP/EPFL
  * @author Paul Phillips

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala b/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala
index 60a4d78..946e710 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
 /* NSC -- new Scala compiler
  * Copyright 2005-2013 LAMP/EPFL
  * @author Stepan Koltsov

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala b/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
index 382f836..13cd2b7 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
@@ -1,3 +1,5 @@
+// scalastyle:off
+
 /* NSC -- new Scala compiler
  * Copyright 2005-2013 LAMP/EPFL
  * @author  Martin Odersky

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
new file mode 100644
index 0000000..7527232
--- /dev/null
+++ b/scalastyle-config.xml
@@ -0,0 +1,126 @@
+<!-- If you wish to turn off checking for a section of code, you can put a comment in the source before and after the section, with the following syntax: -->
+<!-- // scalastyle:off -->
+<!-- ... -->
+<!-- // naughty stuff -->
+<!-- ... -->
+<!-- // scalastyle:on -->
+
+<scalastyle>
+ <name>Scalastyle standard configuration</name>
+ <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+ <!-- <check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maxFileLength"><![CDATA[800]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+  <parameters>
+   <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+  <parameters>
+   <parameter name="maxLineLength"><![CDATA[100]]></parameter>
+   <parameter name="tabSize"><![CDATA[2]]></parameter>
+   <parameter name="ignoreImports">true</parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="false"></check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+  <parameters>
+   <parameter name="maxParameters"><![CDATA[10]]></parameter>
+  </parameters>
+ </check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="false"></check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="regex"><![CDATA[println]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maxTypes"><![CDATA[30]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maximum"><![CDATA[10]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+  <parameters>
+   <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+   <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter>
+  </parameters>
+ </check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maxLength"><![CDATA[50]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maxMethods"><![CDATA[30]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> -->
+ <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+</scalastyle>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 5481393..6a45bc2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -47,7 +47,8 @@ object MasterFailureTest extends Logging {
   def main(args: Array[String]) {
     if (args.size < 2) {
       println(
-        "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+        "Usage: MasterFailureTest <local/HDFS directory> <# batches> " +
+          "[<batch size in milliseconds>]")
       System.exit(1)
     }
     val directory = args(0)
@@ -186,7 +187,8 @@ object MasterFailureTest extends Logging {
 
     // Setup the streaming computation with the given operation
     System.clearProperty("spark.driver.port")
-    val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
+    val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil,
+      Map())
     ssc.checkpoint(checkpointDir.toString)
     val inputStream = ssc.textFileStream(testDir.toString)
     val operatedStream = operation(inputStream)
@@ -287,7 +289,7 @@ object MasterFailureTest extends Logging {
   private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) {
     // Verify whether expected outputs do not consecutive batches with same output
     for (i <- 0 until expectedOutput.size - 1) {
-      assert(expectedOutput(i) != expectedOutput(i+1),
+      assert(expectedOutput(i) != expectedOutput(i + 1),
         "Expected output has consecutive duplicate sequence of values")
     }
 
@@ -384,9 +386,9 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
       Thread.sleep(5000) // To make sure that all the streaming context has been set up
       for (i <- 0 until input.size) {
         // Write the data to a local file and then move it to the target test directory
-        val localFile = new File(localTestDir, (i+1).toString)
-        val hadoopFile = new Path(testDir, (i+1).toString)
-        val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
+        val localFile = new File(localTestDir, (i + 1).toString)
+        val hadoopFile = new Path(testDir, (i + 1).toString)
+        val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
         FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
         var tries = 0
         var done = false
@@ -400,7 +402,8 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
           } catch {
             case ioe: IOException => {
                   fs = testDir.getFileSystem(new Configuration())
-                  logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
+                  logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.",
+                    ioe)
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index 4886cd6..4205224 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -152,7 +152,8 @@ object JavaAPICompletenessChecker {
               if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
                 val tupleParams =
                   parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
-                ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream", tupleParams)
+                ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream",
+                  tupleParams)
               } else {
                 ParameterizedType("org.apache.spark.streaming.api.java.JavaDStream",
                   parameters.map(applySubs))
@@ -175,7 +176,8 @@ object JavaAPICompletenessChecker {
                 ParameterizedType("org.apache.spark.api.java.function.VoidFunction",
                   parameters.dropRight(1).map(applySubs))
               } else {
-                ParameterizedType("org.apache.spark.api.java.function.Function", parameters.map(applySubs))
+                ParameterizedType("org.apache.spark.api.java.function.Function",
+                  parameters.map(applySubs))
               }
             case _ =>
               ParameterizedType(renameSubstitutions.getOrElse(name, name),


Mime
View raw message