spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite
Date Wed, 18 Feb 2015 06:44:21 GMT
Repository: spark
Updated Branches:
  refs/heads/master e50934f11 -> 3912d3324


[SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite

The test was incorrect. Instead of counting the number of records, it counted the number of
partitions of RDD generated by DStream. Which is not its intention. I will be testing this
patch multiple times to understand its flakiness.

PS: This was caused by my refactoring in https://github.com/apache/spark/pull/4384/

koeninger check it out.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #4597 from tdas/kafka-flaky-test and squashes the following commits:

d236235 [Tathagata Das] Unignored last test.
e9a1820 [Tathagata Das] fix test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3912d332
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3912d332
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3912d332

Branch: refs/heads/master
Commit: 3912d332464dcd124c60b734724c34d9742466a4
Parents: e50934f
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue Feb 17 22:44:16 2015 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Feb 17 22:44:16 2015 -0800

----------------------------------------------------------------------
 .../kafka/DirectKafkaStreamSuite.scala          | 28 +++++++++++---------
 1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3912d332/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 9260944..17ca9d1 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
 import java.io.File
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
 import kafka.serializer.StringDecoder
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.{Eventually, Timeouts}
+import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.util.Utils
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
 
 class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   }
 
 
-  ignore("basic stream receiving with multiple topics and smallest starting offset") {
+  test("basic stream receiving with multiple topics and smallest starting offset") {
     val topics = Set("basic1", "basic2", "basic3")
     val data = Map("a" -> 7, "b" -> 9)
     topics.foreach { t =>
       createTopic(t)
       sendMessages(t, data)
     }
+    val totalSent = data.values.sum * topics.size
     val kafkaParams = Map(
       "metadata.broker.list" -> s"$brokerAddress",
       "auto.offset.reset" -> "smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
       KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
         ssc, kafkaParams, topics)
     }
-    var total = 0L
+
+    val allReceived = new ArrayBuffer[(String, String)]
 
     stream.foreachRDD { rdd =>
     // Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
       collected.foreach { case (partSize, rangeSize) =>
         assert(partSize === rangeSize, "offset ranges are wrong")
       }
-      total += collected.size  // Add up all the collected items
     }
+    stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
     ssc.start()
     eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
-      assert(total === data.values.sum * topics.size, "didn't get all messages")
+      assert(allReceived.size === totalSent,
+        "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
     }
     ssc.stop()
   }
 
-  ignore("receiving from largest starting offset") {
+  test("receiving from largest starting offset") {
     val topic = "largest"
     val topicPartition = TopicAndPartition(topic, 0)
     val data = Map("a" -> 10)
@@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   }
 
 
-  ignore("creating stream by offset") {
+  test("creating stream by offset") {
     val topic = "offset"
     val topicPartition = TopicAndPartition(topic, 0)
     val data = Map("a" -> 10)
@@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   }
 
   // Test to verify the offset ranges can be recovered from the checkpoints
-  ignore("offset recovery") {
+  test("offset recovery") {
     val topic = "recovery"
     createTopic(topic)
     testDir = Utils.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message