kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: Revert "KAFKA-1764; ZookeeperConsumerConnector should not put multiple shutdown commands to the same data chunk queue; reviewed by Joel Koshy and Guozhang Wang"
Date Fri, 14 Nov 2014 04:58:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d99af88ea -> 8c35030e4


Revert "KAFKA-1764; ZookeeperConsumerConnector should not put multiple shutdown commands to
the same data chunk queue; reviewed by Joel Koshy and Guozhang Wang"

This reverts commit d99af88eafefbfd1c537a64f8107cd9041346015.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c35030e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c35030e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c35030e

Branch: refs/heads/trunk
Commit: 8c35030e4d24e603e5dbd53862ca6ae4fe59b4e9
Parents: d99af88
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Thu Nov 13 20:58:03 2014 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Nov 13 20:58:03 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/ConsumerIterator.scala          | 1 +
 .../src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c35030e/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 78fbf75..ac491b4 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -71,6 +71,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
       }
       if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
         debug("Received the shutdown command")
+        channel.offer(currentDataChunk)
         return allDone
       } else {
         currentTopicInfo = currentDataChunk.topicInfo

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c35030e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 2402b45..fbc680f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -256,7 +256,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   private def sendShutdownToAllQueues() = {
-    for (queue <- topicThreadIdAndQueues.values.toSet) {
+    for (queue <- topicThreadIdAndQueues.values) {
       debug("Clearing up queue")
       queue.clear()
       queue.put(ZookeeperConsumerConnector.shutdownCommand)


Mime
View raw message