spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-2492][Streaming] kafkaReceiver minor changes to align with Kafka 0.8
Date Tue, 11 Nov 2014 10:22:52 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 fe8a1cd29 -> 7710b7156


[SPARK-2492][Streaming] kafkaReceiver minor changes to align with Kafka 0.8

Update the KafkaReceiver's behavior when auto.offset.reset is set.

In Kafka 0.8, `auto.offset.reset` is a hint for out-range offset to seek to the beginning
or end of the partition. While in the previous code `auto.offset.reset` is a enforcement to
seek to the beginning or end immediately, this is different from Kafka 0.8 defined behavior.

Also deleting extesting ZK metadata in Receiver when multiple consumers are launched will
introduce issue as mentioned in [SPARK-2383](https://issues.apache.org/jira/browse/SPARK-2383).

So Here we change to offer user to API to explicitly reset offset before create Kafka stream,
while in the meantime keep the same behavior as Kafka 0.8 for parameter `auto.offset.reset`.

@tdas, would you please review this PR? Thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes #1420 from jerryshao/kafka-fix and squashes the following commits:

d6ae94d [jerryshao] Address the comment to remove the resetOffset() function
de3a4c8 [jerryshao] Fix compile error
4a1c3f9 [jerryshao] Doc changes
b2c1430 [jerryshao] Move offset reset to a helper function to let user explicitly delete ZK
metadata by calling this API
fac8fd6 [jerryshao] Changes to align with Kafka 0.8

(cherry picked from commit c8850a3d6d948f9dd9ee026ee350428968d3c21b)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7710b715
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7710b715
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7710b715

Branch: refs/heads/branch-1.2
Commit: 7710b7156e0c82445783c3709a4a793d820627b2
Parents: fe8a1cd
Author: jerryshao <saisai.shao@intel.com>
Authored: Tue Nov 11 02:22:23 2014 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Nov 11 02:22:47 2014 -0800

----------------------------------------------------------------------
 .../streaming/kafka/KafkaInputDStream.scala     | 30 --------------------
 .../spark/streaming/kafka/KafkaUtils.scala      | 11 ++++---
 2 files changed, 5 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7710b715/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index e20e2c8..28ac592 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -26,8 +26,6 @@ import java.util.concurrent.Executors
 import kafka.consumer._
 import kafka.serializer.Decoder
 import kafka.utils.VerifiableProperties
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient._
 
 import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
@@ -97,12 +95,6 @@ class KafkaReceiver[
     consumerConnector = Consumer.create(consumerConfig)
     logInfo("Connected to " + zkConnect)
 
-    // When auto.offset.reset is defined, it is our responsibility to try and whack the
-    // consumer group zk node.
-    if (kafkaParams.contains("auto.offset.reset")) {
-      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
-    }
-
     val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
       .newInstance(consumerConfig.props)
       .asInstanceOf[Decoder[K]]
@@ -139,26 +131,4 @@ class KafkaReceiver[
       }
     }
   }
-
-  // It is our responsibility to delete the consumer group when specifying auto.offset.reset.
This
-  // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
-  //
-  // The kafka high level consumer doesn't expose setting offsets currently, this is a trick
copied
-  // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set
to
-  // 'smallest'/'largest':
-  // scalastyle:off
-  // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
-  // scalastyle:on
-  private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
-    val dir = "/consumers/" + groupId
-    logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
-    val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
-    try {
-      zk.deleteRecursive(dir)
-    } catch {
-      case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
-    } finally {
-      zk.close()
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7710b715/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 48668f7..ec812e1 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -17,19 +17,18 @@
 
 package org.apache.spark.streaming.kafka
 
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
 import java.lang.{Integer => JInt}
 import java.util.{Map => JMap}
 
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
 import kafka.serializer.{Decoder, StringDecoder}
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext,
JavaPairDStream}
-import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
-
+import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
 object KafkaUtils {
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message