spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-6765] Fix test code style for streaming.
Date Wed, 08 Apr 2015 07:25:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8d2a36c0f -> 15e0d2bd1


[SPARK-6765] Fix test code style for streaming.

So we can turn style checker on for test code.

Author: Reynold Xin <rxin@databricks.com>

Closes #5409 from rxin/test-style-streaming and squashes the following commits:

7aea69b [Reynold Xin] [SPARK-6765] Fix test code style for streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15e0d2bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15e0d2bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15e0d2bd

Branch: refs/heads/master
Commit: 15e0d2bd1304db62fad286c1bb687e87c361e16c
Parents: 8d2a36c
Author: Reynold Xin <rxin@databricks.com>
Authored: Wed Apr 8 00:24:59 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Apr 8 00:24:59 2015 -0700

----------------------------------------------------------------------
 .../flume/FlumePollingStreamSuite.scala         | 29 ++++++-------
 .../streaming/flume/FlumeStreamSuite.scala      |  4 +-
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  |  3 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  6 ++-
 .../spark/streaming/CheckpointSuite.scala       | 45 +++++++++++++++-----
 .../apache/spark/streaming/FailureSuite.scala   |  4 +-
 .../spark/streaming/InputStreamsSuite.scala     | 15 ++++---
 .../streaming/ReceivedBlockHandlerSuite.scala   |  4 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   |  6 ++-
 .../apache/spark/streaming/ReceiverSuite.scala  | 11 ++---
 .../spark/streaming/StreamingContextSuite.scala |  5 ++-
 .../streaming/StreamingListenerSuite.scala      |  4 +-
 .../apache/spark/streaming/TestSuiteBase.scala  | 28 ++++++------
 .../spark/streaming/UISeleniumSuite.scala       |  3 +-
 .../spark/streaming/WindowOperationsSuite.scala |  4 +-
 .../rdd/WriteAheadLogBackedBlockRDDSuite.scala  | 12 ++++--
 .../streaming/scheduler/JobGeneratorSuite.scala |  2 +-
 .../streaming/util/WriteAheadLogSuite.scala     |  2 +-
 .../spark/streamingtest/ImplicitSuite.scala     |  3 +-
 19 files changed, 115 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index e04d408..2edea9b 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -1,21 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
+
 package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
@@ -213,7 +212,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with
Logging
     assert(counter === totalEventsPerChannel * channels.size)
   }
 
-  def assertChannelIsEmpty(channel: MemoryChannel) = {
+  def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
     val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
     queueRemaining.setAccessible(true)
     val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 51d273a..39e6754 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -151,7 +151,9 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers
with L
   }
 
   /** Class to create socket channel with compression */
-  private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory
{
+  private class CompressionChannelFactory(compressionLevel: Int)
+    extends NioClientSocketChannelFactory {
+
     override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
       val encoder = new ZlibEncoder(compressionLevel)
       pipeline.addFirst("deflater", encoder)

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 24d78ec..a19a72c 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -139,7 +139,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter
{
             msgTopic.publish(message)
           } catch {
             case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT
=>
-              Thread.sleep(50) // wait for Spark streaming to consume something from the
message queue
+              // wait for Spark streaming to consume something from the message queue
+              Thread.sleep(50)
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index cf19171..87bc20f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -171,7 +171,9 @@ class BasicOperationsSuite extends TestSuiteBase {
   test("flatMapValues") {
     testOperation(
       Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
-      (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x
=> Seq(x, x + 10)),
+      (s: DStream[String]) => {
+        s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10))
+      },
       Seq( Seq(("a", 2), ("a", 12), ("b", 1), ("b", 11)), Seq(("", 2), ("", 12)), Seq() ),
       true
     )
@@ -474,7 +476,7 @@ class BasicOperationsSuite extends TestSuiteBase {
       stream.foreachRDD(_ => {})  // Dummy output stream
       ssc.start()
       Thread.sleep(2000)
-      def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
+      def getInputFromSlice(fromMillis: Long, toMillis: Long): Set[Int] = {
         stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 91a2b2b..54c3044 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -43,7 +43,7 @@ class CheckpointSuite extends TestSuiteBase {
 
   var ssc: StreamingContext = null
 
-  override def batchDuration = Milliseconds(500)
+  override def batchDuration: Duration = Milliseconds(500)
 
   override def beforeFunction() {
     super.beforeFunction()
@@ -72,7 +72,7 @@ class CheckpointSuite extends TestSuiteBase {
     val input = (1 to 10).map(_ => Seq("a")).toSeq
     val operation = (st: DStream[String]) => {
       val updateFunc = (values: Seq[Int], state: Option[Int]) => {
-        Some((values.sum + state.getOrElse(0)))
+        Some(values.sum + state.getOrElse(0))
       }
       st.map(x => (x, 1))
       .updateStateByKey(updateFunc)
@@ -199,7 +199,12 @@ class CheckpointSuite extends TestSuiteBase {
     testCheckpointedOperation(
       Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()
),
       (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
-      Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("",
2)), Seq() ),
+      Seq(
+        Seq(("a", 2), ("b", 1)),
+        Seq(("", 2)),
+        Seq(),
+        Seq(("a", 2), ("b", 1)),
+        Seq(("", 2)), Seq() ),
       3
     )
   }
@@ -212,7 +217,8 @@ class CheckpointSuite extends TestSuiteBase {
     val n = 10
     val w = 4
     val input = (1 to n).map(_ => Seq("a")).toSeq
-    val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x
=> Seq(("a", 4)))
+    val output = Seq(
+      Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a",
4)))
     val operation = (st: DStream[String]) => {
       st.map(x => (x, 1))
         .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
@@ -236,7 +242,13 @@ class CheckpointSuite extends TestSuiteBase {
             classOf[TextOutputFormat[Text, IntWritable]])
           output
         },
-        Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("",
2)), Seq()),
+        Seq(
+          Seq(("a", 2), ("b", 1)),
+          Seq(("", 2)),
+          Seq(),
+          Seq(("a", 2), ("b", 1)),
+          Seq(("", 2)),
+          Seq()),
         3
       )
     } finally {
@@ -259,7 +271,13 @@ class CheckpointSuite extends TestSuiteBase {
             classOf[NewTextOutputFormat[Text, IntWritable]])
           output
         },
-        Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("",
2)), Seq()),
+        Seq(
+          Seq(("a", 2), ("b", 1)),
+          Seq(("", 2)),
+          Seq(),
+          Seq(("a", 2), ("b", 1)),
+          Seq(("", 2)),
+          Seq()),
         3
       )
     } finally {
@@ -298,7 +316,13 @@ class CheckpointSuite extends TestSuiteBase {
             output
           }
         },
-        Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("",
2)), Seq()),
+        Seq(
+          Seq(("a", 2), ("b", 1)),
+          Seq(("", 2)),
+          Seq(),
+          Seq(("a", 2), ("b", 1)),
+          Seq(("", 2)),
+          Seq()),
         3
       )
     } finally {
@@ -533,7 +557,8 @@ class CheckpointSuite extends TestSuiteBase {
    * Advances the manual clock on the streaming scheduler by given number of batches.
    * It also waits for the expected amount of time for each batch.
    */
-  def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]]
= {
+  def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]]
=
+  {
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
     logInfo("Manual clock before advancing = " + clock.getTimeMillis())
     for (i <- 1 to numBatches.toInt) {
@@ -543,7 +568,7 @@ class CheckpointSuite extends TestSuiteBase {
     logInfo("Manual clock after advancing = " + clock.getTimeMillis())
     Thread.sleep(batchDuration.milliseconds)
 
-    val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
+    val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
       dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
     }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
     outputStream.output.map(_.flatten)
@@ -552,4 +577,4 @@ class CheckpointSuite extends TestSuiteBase {
 
 private object CheckpointSuite extends Serializable {
   var batchThreeShouldBlockIndefinitely: Boolean = true
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 26435d8..0c4c065 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -29,9 +29,9 @@ class FailureSuite extends TestSuiteBase with Logging {
   val directory = Utils.createTempDir()
   val numBatches = 30
 
-  override def batchDuration = Milliseconds(1000)
+  override def batchDuration: Duration = Milliseconds(1000)
 
-  override def useManualClock = false
+  override def useManualClock: Boolean = false
 
   override def afterFunction() {
     Utils.deleteRecursively(directory)

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7ed6320..e6ac497 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -52,7 +52,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
       "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
     val outputStream = new TestOutputStream(networkStream, outputBuffer)
-    def output = outputBuffer.flatMap(x => x)
+    def output: ArrayBuffer[String] = outputBuffer.flatMap(x => x)
     outputStream.register()
     ssc.start()
 
@@ -164,7 +164,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val countStream = networkStream.count
     val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
     val outputStream = new TestOutputStream(countStream, outputBuffer)
-    def output = outputBuffer.flatMap(x => x)
+    def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
     outputStream.register()
     ssc.start()
 
@@ -196,7 +196,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val queueStream = ssc.queueStream(queue, oneAtATime = true)
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
     val outputStream = new TestOutputStream(queueStream, outputBuffer)
-    def output = outputBuffer.filter(_.size > 0)
+    def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
     outputStream.register()
     ssc.start()
 
@@ -204,7 +204,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
     val input = Seq("1", "2", "3", "4", "5")
     val expectedOutput = input.map(Seq(_))
-    //Thread.sleep(1000)
+
     val inputIterator = input.toIterator
     for (i <- 0 until input.size) {
       // Enqueue more than 1 item per tick but they should dequeue one at a time
@@ -239,7 +239,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val queueStream = ssc.queueStream(queue, oneAtATime = false)
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
     val outputStream = new TestOutputStream(queueStream, outputBuffer)
-    def output = outputBuffer.filter(_.size > 0)
+    def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
     outputStream.register()
     ssc.start()
 
@@ -352,7 +352,8 @@ class TestServer(portToBind: Int = 0) extends Logging {
           logInfo("New connection")
           try {
             clientSocket.setTcpNoDelay(true)
-            val outputStream = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream))
+            val outputStream = new BufferedWriter(
+              new OutputStreamWriter(clientSocket.getOutputStream))
 
             while(clientSocket.isConnected) {
               val msg = queue.poll(100, TimeUnit.MILLISECONDS)
@@ -384,7 +385,7 @@ class TestServer(portToBind: Int = 0) extends Logging {
 
   def stop() { servingThread.interrupt() }
 
-  def port = serverSocket.getLocalPort
+  def port: Int = serverSocket.getLocalPort
 }
 
 /** This is a receiver to test multiple threads inserting data using block generator */

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index ef4873d..c090eae 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -96,7 +96,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with
Matche
       testBlockStoring(handler) { case (data, blockIds, storeResults) =>
         // Verify the data in block manager is correct
         val storedData = blockIds.flatMap { blockId =>
-          blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
+          blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
         }.toList
         storedData shouldEqual data
 
@@ -120,7 +120,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with
Matche
       testBlockStoring(handler) { case (data, blockIds, storeResults) =>
         // Verify the data in block manager is correct
         val storedData = blockIds.flatMap { blockId =>
-          blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
+          blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
         }.toList
         storedData shouldEqual data
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 42fad76..b63b37d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -228,7 +228,8 @@ class ReceivedBlockTrackerSuite
    * Get all the data written in the given write ahead log files. By default, it will read
all
    * files in the test log directory.
    */
-  def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles): Seq[ReceivedBlockTrackerLogEvent]
= {
+  def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles)
+    : Seq[ReceivedBlockTrackerLogEvent] = {
     logFiles.flatMap {
       file => new WriteAheadLogReader(file, hadoopConf).toSeq
     }.map { byteBuffer =>
@@ -244,7 +245,8 @@ class ReceivedBlockTrackerSuite
   }
 
   /** Create batch allocation object from the given info */
-  def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo]): BatchAllocationEvent
= {
+  def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo])
+    : BatchAllocationEvent = {
     BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index aa20ad0..10c35cb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -308,7 +308,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable
{
     val errors = new ArrayBuffer[Throwable]
 
     /** Check if all data structures are clean */
-    def isAllEmpty = {
+    def isAllEmpty: Boolean = {
       singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
         arrayBuffers.isEmpty && errors.isEmpty
     }
@@ -320,24 +320,21 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable
{
     def pushBytes(
         bytes: ByteBuffer,
         optionalMetadata: Option[Any],
-        optionalBlockId: Option[StreamBlockId]
-      ) {
+        optionalBlockId: Option[StreamBlockId]) {
       byteBuffers += bytes
     }
 
     def pushIterator(
         iterator: Iterator[_],
         optionalMetadata: Option[Any],
-        optionalBlockId: Option[StreamBlockId]
-      ) {
+        optionalBlockId: Option[StreamBlockId]) {
       iterators += iterator
     }
 
     def pushArrayBuffer(
         arrayBuffer: ArrayBuffer[_],
         optionalMetadata: Option[Any],
-        optionalBlockId: Option[StreamBlockId]
-      ) {
+        optionalBlockId: Option[StreamBlockId]) {
       arrayBuffers +=  arrayBuffer
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 2e5005e..d1bbf39 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -213,7 +213,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts w
     ssc = new StreamingContext(sc, Milliseconds(100))
     var runningCount = 0
     SlowTestReceiver.receivedAllRecords = false
-    //Create test receiver that sleeps in onStop()
+    // Create test receiver that sleeps in onStop()
     val totalNumRecords = 15
     val recordsPerSecond = 1
     val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond))
@@ -370,7 +370,8 @@ object TestReceiver {
 }
 
 /** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not
*/
-class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY)
with Logging {
+class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
+  extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
 
   var receivingThreadOption: Option[Thread] = None
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index f52562b..852e8bb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -38,8 +38,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
 
   // To make sure that the processing start and end times in collected
   // information are different for successive batches
-  override def batchDuration = Milliseconds(100)
-  override def actuallyWait = true
+  override def batchDuration: Duration = Milliseconds(100)
+  override def actuallyWait: Boolean = true
 
   test("batch info reporting") {
     val ssc = setupStreams(input, operation)

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 3565d62..c3cae8a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -53,8 +53,9 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
     val selectedInput = if (index < input.size) input(index) else Seq[T]()
 
     // lets us test cases where RDDs are not created
-    if (selectedInput == null)
+    if (selectedInput == null) {
       return None
+    }
 
     val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
     logInfo("Created RDD " + rdd.id + " with " + selectedInput)
@@ -104,7 +105,9 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
     output.clear()
   }
 
-  def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten))
+  def toTestOutputStream: TestOutputStream[T] = {
+    new TestOutputStream[T](this.parent, this.output.map(_.flatten))
+  }
 }
 
 /**
@@ -148,34 +151,34 @@ class BatchCounter(ssc: StreamingContext) {
 trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
 
   // Name of the framework for Spark context
-  def framework = this.getClass.getSimpleName
+  def framework: String = this.getClass.getSimpleName
 
   // Master for Spark context
-  def master = "local[2]"
+  def master: String = "local[2]"
 
   // Batch duration
-  def batchDuration = Seconds(1)
+  def batchDuration: Duration = Seconds(1)
 
   // Directory where the checkpoint data will be saved
-  lazy val checkpointDir = {
+  lazy val checkpointDir: String = {
     val dir = Utils.createTempDir()
     logDebug(s"checkpointDir: $dir")
     dir.toString
   }
 
   // Number of partitions of the input parallel collections created for testing
-  def numInputPartitions = 2
+  def numInputPartitions: Int = 2
 
   // Maximum time to wait before the test times out
-  def maxWaitTimeMillis = 10000
+  def maxWaitTimeMillis: Int = 10000
 
   // Whether to use manual clock or not
-  def useManualClock = true
+  def useManualClock: Boolean = true
 
   // Whether to actually wait in real time before changing manual clock
-  def actuallyWait = false
+  def actuallyWait: Boolean = false
 
-  //// A SparkConf to use in tests. Can be modified before calling setupStreams to configure
things.
+  // A SparkConf to use in tests. Can be modified before calling setupStreams to configure
things.
   val conf = new SparkConf()
     .setMaster(master)
     .setAppName(framework)
@@ -346,7 +349,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging
{
 
       // Wait until expected number of output items have been generated
       val startTime = System.currentTimeMillis()
-      while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime
< maxWaitTimeMillis) {
+      while (output.size < numExpectedOutput &&
+        System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
         logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
         ssc.awaitTerminationOrTimeout(50)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 87a0395..998426e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -32,7 +32,8 @@ import org.apache.spark._
 /**
  * Selenium tests for the Spark Web UI.
  */
-class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll
with TestSuiteBase {
+class UISeleniumSuite
+  extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll with TestSuiteBase
{
 
   implicit var webDriver: WebDriver = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index a5d2bb2..c39ad05 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -22,9 +22,9 @@ import org.apache.spark.storage.StorageLevel
 
 class WindowOperationsSuite extends TestSuiteBase {
 
-  override def maxWaitTimeMillis = 20000  // large window tests can sometimes take longer
+  override def maxWaitTimeMillis: Int = 20000  // large window tests can sometimes take longer
 
-  override def batchDuration = Seconds(1)  // making sure its visible in this class
+  override def batchDuration: Duration = Seconds(1)  // making sure its visible in this class
 
   val largerSlideInput = Seq(
     Seq(("a", 1)),

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index 7a6a2f3..c3602a5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -28,10 +28,13 @@ import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel,
StreamBloc
 import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
 import org.apache.spark.util.Utils
 
-class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach
{
+class WriteAheadLogBackedBlockRDDSuite
+  extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
+
   val conf = new SparkConf()
     .setMaster("local[2]")
     .setAppName(this.getClass.getSimpleName)
+
   val hadoopConf = new Configuration()
 
   var sparkContext: SparkContext = null
@@ -86,7 +89,8 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll
w
    * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
    * @param testStoreInBM Test whether blocks read from log are stored back into block manager
    */
-  private def testRDD(numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean
= false) {
+  private def testRDD(
+      numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
     val numBlocks = numPartitionsInBM + numPartitionsInWAL
     val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))
 
@@ -110,7 +114,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll
w
       "Unexpected blocks in BlockManager"
     )
 
-    // Make sure that the right `numPartitionsInWAL` blocks are in write ahead logs, and
other are not
+    // Make sure that the right `numPartitionsInWAL` blocks are in WALs, and other are not
     require(
       segments.takeRight(numPartitionsInWAL).forall(s =>
         new File(s.path.stripPrefix("file://")).exists()),
@@ -152,6 +156,6 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll
w
   }
 
   private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = {
-    Array.fill(count)(new WriteAheadLogFileSegment("random", 0l, 0))
+    Array.fill(count)(new WriteAheadLogFileSegment("random", 0L, 0))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
index 4150b60..7865b06 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -90,7 +90,7 @@ class JobGeneratorSuite extends TestSuiteBase {
       val receiverTracker = ssc.scheduler.receiverTracker
 
       // Get the blocks belonging to a batch
-      def getBlocksOfBatch(batchTime: Long) = {
+      def getBlocksOfBatch(batchTime: Long): Seq[ReceivedBlockInfo] = {
         receiverTracker.getBlocksOfBatchAndStream(Time(batchTime), inputStream.id)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 8335659..a3919c4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -291,7 +291,7 @@ object WriteAheadLogSuite {
     manager
   }
 
-  /** Read data from a segments of a log file directly and return the list of byte buffers.*/
+  /** Read data from a segments of a log file directly and return the list of byte buffers.
*/
   def readDataManually(segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
     segments.map { segment =>
       val reader = HdfsUtils.getInputStream(segment.path, hadoopConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala b/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
index d0bf328..d667504 100644
--- a/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
@@ -25,7 +25,8 @@ package org.apache.spark.streamingtest
  */
 class ImplicitSuite {
 
-  // We only want to test if `implict` works well with the compiler, so we don't need a real
DStream.
+  // We only want to test if `implicit` works well with the compiler,
+  // so we don't need a real DStream.
   def mockDStream[T]: org.apache.spark.streaming.dstream.DStream[T] = null
 
   def testToPairDStreamFunctions(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message