incubator-kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1208919 - in /incubator/kafka/trunk/core/src/main/scala/kafka: consumer/ZookeeperConsumerConnector.scala server/KafkaServerStartable.scala
Date Thu, 01 Dec 2011 00:46:27 GMT
Author: nehanarkhede
Date: Thu Dec  1 00:46:25 2011
New Revision: 1208919

URL: http://svn.apache.org/viewvc?rev=1208919&view=rev
Log:
KAFKA:212 IllegalThreadStateException in kafka mirroring;patched by nehanarkhede; reviewed
by junrao and jaykreps

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1208919&r1=1208918&r2=1208919&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Thu Dec  1 00:46:25 2011
@@ -497,7 +497,10 @@ private[kafka] class ZookeeperConsumerCo
             for (i <- startPart until startPart + nParts) {
               val partition = curPartitions(i)
               info(consumerThreadId + " attempting to claim partition " + partition)
-              if (!processPartition(topicDirs, partition, topic, consumerThreadId))
+              val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
+              if (ownPartition)
+                info(consumerThreadId + " successfully owned partition " + partition)
+              else
                 return false
             }
             queuesToBeCleared += queues.get((topic, consumerThreadId))

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1208919&r1=1208918&r2=1208919&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Thu
Dec  1 00:46:25 2011
@@ -130,6 +130,9 @@ class EmbeddedConsumer(private val consu
        */
       threadList.foreach(_.shutdown)
 
+      // KAFKA: 212: clear the thread list to remove the older thread references that are
already shutdown
+      threadList = Nil
+
       consumerConnector = Consumer.create(consumerConfig)
       val topicMessageStreams =  consumerConnector.createMessageStreams(topicMap)
       for ((topic, streamList) <- topicMessageStreams)



Mime
View raw message