Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ABDD1200C1A for ; Mon, 13 Feb 2017 08:00:35 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AA776160B60; Mon, 13 Feb 2017 07:00:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F154E160B4D for ; Mon, 13 Feb 2017 08:00:34 +0100 (CET) Received: (qmail 40436 invoked by uid 500); 13 Feb 2017 07:00:34 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 40426 invoked by uid 99); 13 Feb 2017 07:00:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Feb 2017 07:00:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E4F01DFCBD; Mon, 13 Feb 2017 07:00:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zsxwing@apache.org To: commits@spark.apache.org Message-Id: <94d56eb1328843f3b75f1b7a4a600721@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should not be in the same group Date: Mon, 13 Feb 2017 07:00:33 +0000 (UTC) archived-at: Mon, 13 Feb 2017 07:00:35 -0000 Repository: spark Updated Branches: refs/heads/branch-2.1 06e77e009 -> fe4fcc570 [SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should not be in the same group ## What changes were proposed in this pull request? In `KafkaOffsetReader`, when error occurs, we abort the existing consumer and create a new consumer. In our current implementation, the first consumer and the second consumer would be in the same group (which leads to SPARK-19559), **_violating our intention of the two consumers not being in the same group._** The cause is that, in our current implementation, the first consumer is created before `groupId` and `nextId` are initialized in the constructor. Then even if `groupId` and `nextId` are increased during the creation of that first consumer, `groupId` and `nextId` would still be initialized to default values in the constructor for the second consumer. We should make sure that `groupId` and `nextId` are initialized before any consumer is created. ## How was this patch tested? Ran 100 times of `KafkaSourceSuite`; all passed Author: Liwei Lin Closes #16902 from lw-lin/SPARK-19564-. (cherry picked from commit 2bdbc87052389ff69404347fbc69457132dbcafd) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe4fcc57 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe4fcc57 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe4fcc57 Branch: refs/heads/branch-2.1 Commit: fe4fcc5701cbd3f2e698e00f1cc7d49d5c7c702b Parents: 06e77e0 Author: Liwei Lin Authored: Sun Feb 12 23:00:22 2017 -0800 Committer: Shixiong Zhu Committed: Sun Feb 12 23:00:30 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/kafka010/KafkaOffsetReader.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fe4fcc57/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 6b2fb3c..2696d6f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -65,6 +65,13 @@ private[kafka010] class KafkaOffsetReader( val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread) /** + * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer is + * created -- see SPARK-19564. + */ + private var groupId: String = null + private var nextId = 0 + + /** * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the * offsets and never commits them. */ @@ -76,10 +83,6 @@ private[kafka010] class KafkaOffsetReader( private val offsetFetchAttemptIntervalMs = readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong - private var groupId: String = null - - private var nextId = 0 - private def nextGroupId(): String = { groupId = driverGroupIdPrefix + "-" + nextId nextId += 1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org