Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D8ED1200BC5 for ; Mon, 7 Nov 2016 19:44:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D7A39160AE0; Mon, 7 Nov 2016 18:44:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 30F12160AEC for ; Mon, 7 Nov 2016 19:44:09 +0100 (CET) Received: (qmail 16593 invoked by uid 500); 7 Nov 2016 18:44:08 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 16582 invoked by uid 99); 7 Nov 2016 18:44:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 18:44:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 487C1E07EF; Mon, 7 Nov 2016 18:44:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zsxwing@apache.org To: commits@spark.apache.org Message-Id: <2508e344138242a4aaa857b36c5c02dc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset in latest Date: Mon, 7 Nov 2016 18:44:08 +0000 (UTC) archived-at: Mon, 07 Nov 2016 18:44:10 -0000 Repository: spark Updated Branches: refs/heads/branch-2.0 b5d7217af -> 10525c294 [SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset in latest ## What changes were proposed in this pull request? Added test to check whether default starting offset in latest ## How was this patch tested? new unit test Author: Tathagata Das Closes #15778 from tdas/SPARK-18283. (cherry picked from commit b06c23db9aedae48c9eba9d702ae82fa5647cfe5) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10525c29 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10525c29 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10525c29 Branch: refs/heads/branch-2.0 Commit: 10525c2947d9d1593e77e6af692573b03de6a71f Parents: b5d7217 Author: Tathagata Das Authored: Mon Nov 7 10:43:36 2016 -0800 Committer: Shixiong Zhu Committed: Mon Nov 7 10:44:05 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaSourceSuite.scala | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/10525c29/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index ed4cc75..89e713f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } + test("starting offset is latest by default") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("0")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + + val kafka = reader.load() + .selectExpr("CAST(value AS STRING)") + .as[String] + val mapped = kafka.map(_.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(1, 2, 3) // should not have 0 + ) + } + test("bad source options") { def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = { val ex = intercept[IllegalArgumentException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org