spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should not be in the same group
Date Mon, 13 Feb 2017 07:00:33 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 06e77e009 -> fe4fcc570


[SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should not be in the same
group

## What changes were proposed in this pull request?

In `KafkaOffsetReader`, when error occurs, we abort the existing consumer and create a new
consumer. In our current implementation, the first consumer and the second consumer would
be in the same group (which leads to SPARK-19559), **_violating our intention of the two consumers
not being in the same group._**

The cause is that, in our current implementation, the first consumer is created before `groupId`
and `nextId` are initialized in the constructor. Then even if `groupId` and `nextId` are increased
during the creation of that first consumer, `groupId` and `nextId` would still be initialized
to default values in the constructor for the second consumer.

We should make sure that `groupId` and `nextId` are initialized before any consumer is created.

## How was this patch tested?

Ran 100 times of `KafkaSourceSuite`; all passed

Author: Liwei Lin <lwlin7@gmail.com>

Closes #16902 from lw-lin/SPARK-19564-.

(cherry picked from commit 2bdbc87052389ff69404347fbc69457132dbcafd)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe4fcc57
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe4fcc57
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe4fcc57

Branch: refs/heads/branch-2.1
Commit: fe4fcc5701cbd3f2e698e00f1cc7d49d5c7c702b
Parents: 06e77e0
Author: Liwei Lin <lwlin7@gmail.com>
Authored: Sun Feb 12 23:00:22 2017 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Sun Feb 12 23:00:30 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaOffsetReader.scala    | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe4fcc57/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 6b2fb3c..2696d6f 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -65,6 +65,13 @@ private[kafka010] class KafkaOffsetReader(
   val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
 
   /**
+   * Place [[groupId]] and [[nextId]] here so that they are initialized before any consumer
is
+   * created -- see SPARK-19564.
+   */
+  private var groupId: String = null
+  private var nextId = 0
+
+  /**
    * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries
the
    * offsets and never commits them.
    */
@@ -76,10 +83,6 @@ private[kafka010] class KafkaOffsetReader(
   private val offsetFetchAttemptIntervalMs =
     readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
 
-  private var groupId: String = null
-
-  private var nextId = 0
-
   private def nextGroupId(): String = {
     groupId = driverGroupIdPrefix + "-" + nextId
     nextId += 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message