spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset in latest
Date Mon, 07 Nov 2016 18:44:08 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b5d7217af -> 10525c294


[SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset
in latest

## What changes were proposed in this pull request?

Added test to check whether default starting offset in latest

## How was this patch tested?
new unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15778 from tdas/SPARK-18283.

(cherry picked from commit b06c23db9aedae48c9eba9d702ae82fa5647cfe5)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10525c29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10525c29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10525c29

Branch: refs/heads/branch-2.0
Commit: 10525c2947d9d1593e77e6af692573b03de6a71f
Parents: b5d7217
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Mon Nov 7 10:43:36 2016 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Mon Nov 7 10:44:05 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 24 ++++++++++++++++++++
 1 file changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/10525c29/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index ed4cc75..89e713f 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest {
     )
   }
 
+  test("starting offset is latest by default") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("0"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+
+    val kafka = reader.load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+    val mapped = kafka.map(_.toInt)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(1, 2, 3)  // should not have 0
+    )
+  }
+
   test("bad source options") {
     def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
       val ex = intercept[IllegalArgumentException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message