spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [5/6] git commit: Some clean-up of tests
Date Fri, 25 Oct 2013 00:08:53 GMT
Some clean-up of tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/39f6f755
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/39f6f755
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/39f6f755

Branch: refs/heads/master
Commit: 39f6f75588b69f07cd963c5e211045fed103695b
Parents: 9423532
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Thu Oct 24 16:43:33 2013 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Thu Oct 24 16:43:33 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/spark/streaming/JavaTestUtils.scala   |  3 +--
 .../org/apache/spark/streaming/CheckpointSuite.scala      |  4 ++--
 .../scala/org/apache/spark/streaming/TestSuiteBase.scala  | 10 +++++++---
 3 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/39f6f755/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 8a66049..5344ae7 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -54,8 +54,7 @@ trait JavaTestBase extends TestSuiteBase {
   {
     implicit val cm: ClassManifest[T] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
-    val ostream = new TestOutputStream(dstream.dstream,
-      new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
+    val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
     dstream.dstream.ssc.registerOutputStream(ostream)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/39f6f755/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a327de8..beb2083 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -366,7 +366,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     logInfo("Manual clock after advancing = " + clock.time)
     Thread.sleep(batchDuration.milliseconds)
 
-    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
-    outputStream.output
+    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+    outputStream.output.map(_.flatten)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/39f6f755/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 26f515a..be14069 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -63,7 +63,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input:
Seq[Seq[
  *
  * The buffer contains a sequence of RDD's, each containing a sequence of items
  */
-class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
+class TestOutputStream[T: ClassManifest](parent: DStream[T],
+    val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
     output += collected
@@ -82,9 +83,10 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output:
ArrayBu
  * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
  *
  * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
- * containing a sequnce of items.
+ * containing a sequence of items.
  */
-class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]])
+class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T],
+    val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.glom().collect().map(_.toSeq)
     output += collected
@@ -96,6 +98,8 @@ class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T],
val o
     ois.defaultReadObject()
     output.clear()
   }
+
+  def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten))
 }
 
 /**


Mime
View raw message