spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [3/6] git commit: Adding tests
Date Fri, 25 Oct 2013 00:08:51 GMT
Adding tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/05ac9940
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/05ac9940
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/05ac9940

Branch: refs/heads/master
Commit: 05ac9940ee97744b8952ede74edfcd63e6e55a5b
Parents: 2fda84f
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Thu Oct 24 14:29:19 2013 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Thu Oct 24 14:31:34 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 13 -----
 .../spark/streaming/BasicOperationsSuite.scala  | 38 ++++++++++++++
 .../apache/spark/streaming/TestSuiteBase.scala  | 55 ++++++++++++++++++--
 3 files changed, 88 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05ac9940/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index fd00183..354ab8a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -157,19 +157,6 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(partitions2(0).length > 0)
     assert(partitions2(19).length > 0)
     assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
-
-    // Coalesce partitions - no shuffle
-    val repartitioned3 = data.repartition(2, skipShuffle = true)
-    assert(repartitioned3.partitions.size == 2)
-    val partitions3 = repartitioned3.glom().collect()
-    assert(partitions3(0).toList === (1 to 500).toList)
-    assert(partitions3(1).toList === (501 to 1000).toList)
-    assert(repartitioned3.collect().toSet === (1 to 1000).toSet)
-
-    // Split partitions - no shuffle (should throw exn)
-    intercept[IllegalArgumentException] {
-      data.repartition(20, skipShuffle = true)
-    }
   }
 
   test("coalesced RDDs") {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05ac9940/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 11586f7..55cfcb3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -82,6 +82,44 @@ class BasicOperationsSuite extends TestSuiteBase {
     testOperation(input, operation, output, true)
   }
 
+  test("repartition (more partitions)") {
+    val input = Seq(1 to 100, 101 to 200, 201 to 300)
+    val operation = (r: DStream[Int]) => r.repartition(5)
+    val ssc = setupStreams(input, operation, 2)
+    val output = runStreamsWithPartitions(ssc, 3, 3)
+    assert(output.size === 3)
+    val first = output(0)
+    val second = output(1)
+    val third = output(2)
+
+    assert(first.size === 5)
+    assert(second.size === 5)
+    assert(third.size === 5)
+
+    assert(first.flatten.toSet === (1 to 100).toSet)
+    assert(second.flatten.toSet === (101 to 200).toSet)
+    assert(third.flatten.toSet === (201 to 300).toSet)
+  }
+
+  test("repartition (fewer partitions)") {
+    val input = Seq(1 to 100, 101 to 200, 201 to 300)
+    val operation = (r: DStream[Int]) => r.repartition(2)
+    val ssc = setupStreams(input, operation, 5)
+    val output = runStreamsWithPartitions(ssc, 3, 3)
+    assert(output.size === 3)
+    val first = output(0)
+    val second = output(1)
+    val third = output(2)
+
+    assert(first.size === 2)
+    assert(second.size === 2)
+    assert(third.size === 2)
+
+    assert(first.flatten.toSet === (1 to 100).toSet)
+    assert(second.flatten.toSet === (101 to 200).toSet)
+    assert(third.flatten.toSet === (201 to 300).toSet)
+  }
+
   test("groupByKey") {
     testOperation(
       Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05ac9940/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 37dd9c4..26f515a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -60,6 +60,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input:
Seq[Seq[
 /**
  * This is a output stream just for the testsuites. All the output is collected into a
  * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of items
  */
 class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
@@ -76,6 +78,27 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output:
ArrayBu
 }
 
 /**
+ * This is a output stream just for the testsuites. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
+ * containing a sequnce of items.
+ */
+class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]])
+  extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+    val collected = rdd.glom().collect().map(_.toSeq)
+    output += collected
+  }) {
+
+  // This is to clear the output buffer every it is read from a checkpoint
+  @throws(classOf[IOException])
+  private def readObject(ois: ObjectInputStream) {
+    ois.defaultReadObject()
+    output.clear()
+  }
+}
+
+/**
  * This is the base trait for Spark Streaming testsuites. This provides basic functionality
  * to run user-defined set of input on user-defined stream operations, and verify the output.
  */
@@ -108,7 +131,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging
{
    */
   def setupStreams[U: ClassManifest, V: ClassManifest](
       input: Seq[Seq[U]],
-      operation: DStream[U] => DStream[V]
+      operation: DStream[U] => DStream[V],
+      numPartitions: Int = numInputPartitions
     ): StreamingContext = {
 
     // Create StreamingContext
@@ -118,9 +142,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging
{
     }
 
     // Setup the stream computation
-    val inputStream = new TestInputStream(ssc, input, numInputPartitions)
+    val inputStream = new TestInputStream(ssc, input, numPartitions)
     val operatedStream = operation(inputStream)
-    val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with
SynchronizedBuffer[Seq[V]])
+    val outputStream = new TestOutputStreamWithPartitions(operatedStream,
+      new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
     ssc.registerInputStream(inputStream)
     ssc.registerOutputStream(outputStream)
     ssc
@@ -146,7 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging
{
     val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions)
     val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
     val operatedStream = operation(inputStream1, inputStream2)
-    val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with
SynchronizedBuffer[Seq[W]])
+    val outputStream = new TestOutputStreamWithPartitions(operatedStream,
+      new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
     ssc.registerInputStream(inputStream1)
     ssc.registerInputStream(inputStream2)
     ssc.registerOutputStream(outputStream)
@@ -157,18 +183,37 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging
{
    * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
    * returns the collected output. It will wait until `numExpectedOutput` number of
    * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
+   *
+   * Returns a sequence of items for each RDD.
    */
   def runStreams[V: ClassManifest](
       ssc: StreamingContext,
       numBatches: Int,
       numExpectedOutput: Int
     ): Seq[Seq[V]] = {
+    // Flatten each RDD into a single Seq
+    runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
+  }
+
+  /**
+   * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
+   * returns the collected output. It will wait until `numExpectedOutput` number of
+   * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
+   *
+   * Returns a sequence of RDD's. Each RDD is represented as several sequences of items,
each
+   * representing one partition.
+   */
+  def runStreamsWithPartitions[V: ClassManifest](
+      ssc: StreamingContext,
+      numBatches: Int,
+      numExpectedOutput: Int
+    ): Seq[Seq[Seq[V]]] = {
     assert(numBatches > 0, "Number of batches to run stream computation is zero")
     assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + "
is zero")
     logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
 
     // Get the output buffer
-    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
     val output = outputStream.output
 
     try {


Mime
View raw message