kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1103; Consumer uses two zkclients; patched by Guozhang Wang; reviewed by Joel Koshy and Jun Rao
Date Fri, 22 Nov 2013 17:15:45 GMT
Updated Branches:
  refs/heads/trunk 2477a7468 -> 87efda7f8


kafka-1103; Consumer uses two zkclients; patched by Guozhang Wang; reviewed by Joel Koshy
and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/87efda7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/87efda7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/87efda7f

Branch: refs/heads/trunk
Commit: 87efda7f818218e0868be7032c73c994d75931fd
Parents: 2477a74
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Fri Nov 22 09:16:39 2013 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Nov 22 09:16:39 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/TopicFilter.scala  |  6 ------
 .../consumer/ZookeeperConsumerConnector.scala    | 18 +++++-------------
 .../consumer/ZookeeperTopicEventWatcher.scala    | 19 ++++++-------------
 .../unit/kafka/consumer/TopicFilterTest.scala    |  4 ----
 4 files changed, 11 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/87efda7f/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index cf3853b..4f20823 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -41,14 +41,10 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
 
   override def toString = regex
 
-  def requiresTopicEventWatcher: Boolean
-
   def isTopicAllowed(topic: String): Boolean
 }
 
 case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
-  override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""")
-
   override def isTopicAllowed(topic: String) = {
     val allowed = topic.matches(regex)
 
@@ -62,8 +58,6 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
 }
 
 case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
-  override def requiresTopicEventWatcher = true
-
   override def isTopicAllowed(topic: String) = {
     val allowed = !topic.matches(regex)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/87efda7f/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 6d0cfa6..0cc236a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -754,19 +754,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
     reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
 
-    if (!topicFilter.requiresTopicEventWatcher) {
-      info("Not creating event watcher for trivial whitelist " + topicFilter)
-    }
-    else {
-      info("Creating topic event watcher for whitelist " + topicFilter)
-      wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this)
-
-      /*
-       * Topic events will trigger subsequent synced rebalances. Also, the
-       * consumer will get registered only after an allowed topic becomes
-       * available.
-       */
-    }
+    /*
+     * Topic events will trigger subsequent synced rebalances.
+     */
+    info("Creating topic event watcher for topics " + topicFilter)
+    wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkClient, this)
 
     def handleTopicEvent(allTopics: Seq[String]) {
       debug("Handling topic event")

http://git-wip-us.apache.org/repos/asf/kafka/blob/87efda7f/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index a67c193..38f4ec0 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -22,14 +22,11 @@ import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
-class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
+class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
     val eventHandler: TopicEventHandler[String]) extends Logging {
 
   val lock = new Object()
 
-  private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs, ZKStringSerializer)
-
   startWatchingTopicEvents()
 
   private def startWatchingTopicEvents() {
@@ -53,11 +50,10 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
       info("Shutting down topic event watcher.")
       if (zkClient != null) {
         stopWatchingTopicEvents()
-        zkClient.close()
-        zkClient = null
       }
-      else
-        warn("Cannot shutdown already shutdown topic event watcher.")
+      else {
+        warn("Cannot shutdown since the embedded zookeeper client has already closed.")
+      }
     }
   }
 
@@ -70,7 +66,6 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
           if (zkClient != null) {
             val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
             debug("all topics: %s".format(latestTopics))
-
             eventHandler.handleTopicEvent(latestTopics)
           }
         }
@@ -93,10 +88,8 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
     def handleNewSession() {
       lock.synchronized {
         if (zkClient != null) {
-          info(
-            "ZK expired: resubscribing topic event listener to topic registry")
-          zkClient.subscribeChildChanges(
-            ZkUtils.BrokerTopicsPath, topicEventListener)
+          info("ZK expired: resubscribing topic event listener to topic registry")
+          zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/87efda7f/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 40a2bf7..cf2724b 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -29,16 +29,13 @@ class TopicFilterTest extends JUnitSuite {
   def testWhitelists() {
 
     val topicFilter1 = new Whitelist("white1,white2")
-    assertFalse(topicFilter1.requiresTopicEventWatcher)
     assertTrue(topicFilter1.isTopicAllowed("white2"))
     assertFalse(topicFilter1.isTopicAllowed("black1"))
 
     val topicFilter2 = new Whitelist(".+")
-    assertTrue(topicFilter2.requiresTopicEventWatcher)
     assertTrue(topicFilter2.isTopicAllowed("alltopics"))
     
     val topicFilter3 = new Whitelist("white_listed-topic.+")
-    assertTrue(topicFilter3.requiresTopicEventWatcher)
     assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1"))
     assertFalse(topicFilter3.isTopicAllowed("black1"))
   }
@@ -46,6 +43,5 @@ class TopicFilterTest extends JUnitSuite {
   @Test
   def testBlacklists() {
     val topicFilter1 = new Blacklist("black1")
-    assertTrue(topicFilter1.requiresTopicEventWatcher)
   }
 }
\ No newline at end of file


Mime
View raw message