spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-7927] whitespace fixes for streaming.
Date Fri, 29 May 2015 00:55:26 GMT
Repository: spark
Updated Branches:
  refs/heads/master 1bd63e82f -> 3af0b3136


[SPARK-7927] whitespace fixes for streaming.

So we can enable a whitespace enforcement rule in the style checker to save code review time.

Author: Reynold Xin <rxin@databricks.com>

Closes #6475 from rxin/whitespace-streaming and squashes the following commits:

810dae4 [Reynold Xin] Fixed tests.
89068ad [Reynold Xin] [SPARK-7927] whitespace fixes for streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3af0b313
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3af0b313
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3af0b313

Branch: refs/heads/master
Commit: 3af0b3136e4b7dea52c413d640653ccddc638574
Parents: 1bd63e8
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu May 28 17:55:22 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu May 28 17:55:22 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/StreamingContext.scala  | 2 +-
 .../apache/spark/streaming/api/java/JavaPairDStream.scala    | 8 ++++----
 .../scala/org/apache/spark/streaming/dstream/DStream.scala   | 2 +-
 .../apache/spark/streaming/dstream/FileInputDStream.scala    | 8 ++++----
 .../spark/streaming/dstream/PairDStreamFunctions.scala       | 2 +-
 .../spark/streaming/dstream/ReducedWindowedDStream.scala     | 8 ++++----
 .../org/apache/spark/streaming/dstream/ShuffledDStream.scala | 6 +++---
 .../org/apache/spark/streaming/dstream/StateDStream.scala    | 2 +-
 .../org/apache/spark/streaming/dstream/WindowedDStream.scala | 4 ++--
 .../org/apache/spark/streaming/receiver/BlockGenerator.scala | 2 +-
 .../org/apache/spark/streaming/receiver/RateLimiter.scala    | 3 ++-
 .../apache/spark/streaming/scheduler/ReceiverTracker.scala   | 2 +-
 .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +-
 .../org/apache/spark/streaming/util/RawTextHelper.scala      | 4 ++--
 .../org/apache/spark/streaming/BasicOperationsSuite.scala    | 6 +++---
 .../scala/org/apache/spark/streaming/InputStreamsSuite.scala | 2 +-
 .../org/apache/spark/streaming/StreamingContextSuite.scala   | 4 +++-
 .../org/apache/spark/streaming/StreamingListenerSuite.scala  | 6 ++++--
 .../streaming/ui/StreamingJobProgressListenerSuite.scala     | 2 +-
 19 files changed, 40 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 5e58ed7..25842d5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -461,7 +461,7 @@ class StreamingContext private[streaming] (
     val conf = sc_.hadoopConfiguration
     conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
     val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
-      directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true,
conf)
+      directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true,
conf)
     val data = br.map { case (k, v) =>
       val bytes = v.getBytes
       require(bytes.length == recordLength, "Byte array does not have correct length. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 93baad1..959ac9c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -227,7 +227,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param numPartitions  Number of partitions of each RDD in the new DStream.
    */
   def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions:
Int)
-  :JavaPairDStream[K, JIterable[V]] = {
+    : JavaPairDStream[K, JIterable[V]] = {
     dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
       .mapValues(asJavaIterable _)
   }
@@ -247,7 +247,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner
-    ):JavaPairDStream[K, JIterable[V]] = {
+    ): JavaPairDStream[K, JIterable[V]] = {
     dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
       .mapValues(asJavaIterable _)
   }
@@ -262,7 +262,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       batching interval
    */
   def reduceByKeyAndWindow(reduceFunc: JFunction2[V, V, V], windowDuration: Duration)
-  :JavaPairDStream[K, V] = {
+    : JavaPairDStream[K, V] = {
     dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
   }
 
@@ -281,7 +281,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       reduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration
-    ):JavaPairDStream[K, V] = {
+    ): JavaPairDStream[K, V] = {
     dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index c858647..6efcc19 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -659,7 +659,7 @@ abstract class DStream[T: ClassTag] (
     // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     val cleanedF = context.sparkContext.clean(transformFunc, false)
-    val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
+    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
       assert(rdds.length == 1)
       cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index eca69f0..6c1fab5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -69,7 +69,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
  *   processing semantics are undefined.
  */
 private[streaming]
-class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
+class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
     @transient ssc_ : StreamingContext,
     directory: String,
     filter: Path => Boolean = FileInputDStream.defaultFilter,
@@ -251,7 +251,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
 
   /** Generate one RDD from an array of files */
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
-    val fileRDDs = files.map(file =>{
+    val fileRDDs = files.map { file =>
       val rdd = serializableConfOpt.map(_.value) match {
         case Some(config) => context.sparkContext.newAPIHadoopFile(
           file,
@@ -267,7 +267,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
           "Refer to the streaming programming guide for more details.")
       }
       rdd
-    })
+    }
     new UnionRDD(context.sparkContext, fileRDDs)
   }
 
@@ -294,7 +294,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
   private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
-    generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
+    generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
     batchTimeToSelectedFiles =
       new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
     recentlySelectedFiles = new mutable.HashSet[String]()

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fda22eb..358e4c6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.StreamingContext.rddToFileName
 /**
  * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
  */
-class PairDStreamFunctions[K, V](self: DStream[(K,V)])
+class PairDStreamFunctions[K, V](self: DStream[(K, V)])
     (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
   extends Serializable
 {

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index df9f7f1..6a583bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -38,7 +38,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     _windowDuration: Duration,
     _slideDuration: Duration,
     partitioner: Partitioner
-  ) extends DStream[(K,V)](parent.ssc) {
+  ) extends DStream[(K, V)](parent.ssc) {
 
   require(_windowDuration.isMultipleOf(parent.slideDuration),
     "The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
@@ -58,7 +58,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
   super.persist(StorageLevel.MEMORY_ONLY_SER)
   reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
 
-  def windowDuration: Duration =  _windowDuration
+  def windowDuration: Duration = _windowDuration
 
   override def dependencies: List[DStream[_]] = List(reducedStream)
 
@@ -68,7 +68,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
 
   override def parentRememberDuration: Duration = rememberDuration + windowDuration
 
-  override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
+  override def persist(storageLevel: StorageLevel): DStream[(K, V)] = {
     super.persist(storageLevel)
     reducedStream.persist(storageLevel)
     this
@@ -118,7 +118,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
 
     // Get the RDD of the reduced value of the previous window
     val previousWindowRDD =
-      getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+      getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))
 
     // Make the list of RDDs that needs to cogrouped together for reducing their reduced
values
     val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index 7757cca..e0ffd5d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -25,19 +25,19 @@ import scala.reflect.ClassTag
 
 private[streaming]
 class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
-    parent: DStream[(K,V)],
+    parent: DStream[(K, V)],
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiner: (C, C) => C,
     partitioner: Partitioner,
     mapSideCombine: Boolean = true
-  ) extends DStream[(K,C)] (parent.ssc) {
+  ) extends DStream[(K, C)] (parent.ssc) {
 
   override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 
-  override def compute(validTime: Time): Option[RDD[(K,C)]] = {
+  override def compute(validTime: Time): Option[RDD[(K, C)]] = {
     parent.getOrCompute(validTime) match {
       case Some(rdd) => Some(rdd.combineByKey[C](
           createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index de8718d..621d6df 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -51,7 +51,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
     val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
       val i = iterator.map(t => {
         val itr = t._2._2.iterator
-        val headOption = if(itr.hasNext) Some(itr.next) else None
+        val headOption = if (itr.hasNext) Some(itr.next()) else None
         (t._1, t._2._1.toSeq, headOption)
       })
       updateFuncLocal(i)

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 899865a..4efba03 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -44,7 +44,7 @@ class WindowedDStream[T: ClassTag](
   // Persist parent level by default, as those RDDs are going to be obviously reused.
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 
-  def windowDuration: Duration =  _windowDuration
+  def windowDuration: Duration = _windowDuration
 
   override def dependencies: List[DStream[_]] = List(parent)
 
@@ -68,7 +68,7 @@ class WindowedDStream[T: ClassTag](
       new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
     } else {
       logDebug("Using normal union for windowing at " + validTime)
-      new UnionRDD(ssc.sc,rddsInWindow)
+      new UnionRDD(ssc.sc, rddsInWindow)
     }
     Some(windowRDD)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 4bebcc5..0588517 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -164,7 +164,7 @@ private[streaming] class BlockGenerator(
   private def keepPushingBlocks() {
     logInfo("Started block pushing thread")
     try {
-      while(!stopped) {
+      while (!stopped) {
         Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
           case Some(block) => pushBlock(block)
           case None =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index 97db9de..8df542b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.streaming.receiver
 
+import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}
+
 import org.apache.spark.{Logging, SparkConf}
-import com.google.common.util.concurrent.{RateLimiter=>GuavaRateLimiter}
 
 /** Provides waitToPush() method to limit the rate at which receivers consume data.
   *

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index f73f7e7..f1504b0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -230,7 +230,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean
= false
   class ReceiverLauncher {
     @transient val env = ssc.env
     @volatile @transient private var running = false
-    @transient val thread  = new Thread() {
+    @transient val thread = new Thread() {
       override def run() {
         try {
           SparkEnv.set(env)

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 87ba4f8..fe6328b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -200,7 +200,7 @@ private[streaming] class FileBasedWriteAheadLog(
   /** Initialize the log directory or recover existing logs inside the directory */
   private def initializeOrRecover(): Unit = synchronized {
     val logDirectoryPath = new Path(logDirectory)
-    val fileSystem =  HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
 
     if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir)
{
       val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath
})

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 4d968f8..4089366 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -27,7 +27,7 @@ object RawTextHelper {
    * Splits lines and counts the words.
    */
   def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
-    val map = new OpenHashMap[String,Long]
+    val map = new OpenHashMap[String, Long]
     var i = 0
     var j = 0
     while (iter.hasNext) {
@@ -98,7 +98,7 @@ object RawTextHelper {
    * before real workload starts.
    */
   def warmUp(sc: SparkContext) {
-    for(i <- 0 to 1) {
+    for (i <- 0 to 1) {
       sc.parallelize(1 to 200000, 1000)
         .map(_ % 1331).map(_.toString)
         .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index f269cb7..08faeaa 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -255,7 +255,7 @@ class BasicOperationsSuite extends TestSuiteBase {
       Seq(  )
     )
     val operation = (s1: DStream[String], s2: DStream[String]) => {
-      s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq,
x._2.toSeq))
+      s1.map(x => (x, 1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq,
x._2.toSeq))
     }
     testOperation(inputData1, inputData2, operation, outputData, true)
   }
@@ -427,9 +427,9 @@ class BasicOperationsSuite extends TestSuiteBase {
   test("updateStateByKey - object lifecycle") {
     val inputData =
       Seq(
-        Seq("a","b"),
+        Seq("a", "b"),
         null,
-        Seq("a","c","a"),
+        Seq("a", "c", "a"),
         Seq("c"),
         null,
         null

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 0122514..b74d67c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -418,7 +418,7 @@ class TestServer(portToBind: Int = 0) extends Logging {
   val servingThread = new Thread() {
     override def run() {
       try {
-        while(true) {
+        while (true) {
           logInfo("Accepting connections on port " + port)
           val clientSocket = serverSocket.accept()
           if (startLatch.getCount == 1) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f8e8030..e36c791 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -732,7 +732,9 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
 
   def onStop() {
     // Simulate slow receiver by waiting for all records to be produced
-    while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
+    while (!SlowTestReceiver.receivedAllRecords) {
+      Thread.sleep(100)
+    }
     // no clean to be done, the receiving thread should stop on it own
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 312cce4..1dc8960 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -133,8 +133,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
 
   /** Check if a sequence of numbers is in increasing order */
   def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
-    for(i <- 1 until seq.size) {
-      if (seq(i - 1) > seq(i)) return false
+    for (i <- 1 until seq.size) {
+      if (seq(i - 1) > seq(i)) {
+        return false
+      }
     }
     true
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af0b313/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 2a0f458..c9175d6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -64,7 +64,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers
{
     listener.numTotalReceivedRecords should be (0)
 
     // onBatchStarted
-    val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords,  1000, Some(2000),
None)
+    val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000),
None)
     listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
     listener.waitingBatches should be (Nil)
     listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message