kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1182028 - /incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
Date Tue, 11 Oct 2011 19:11:39 GMT
Author: junrao
Date: Tue Oct 11 19:11:38 2011
New Revision: 1182028

URL: http://svn.apache.org/viewvc?rev=1182028&view=rev
Log:
ZK consumer may lose a chunk worth of message during rebalance; patched by Jun Rao; reviewed
by Neha Narkhede; KAFKA-154

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1182028&r1=1182027&r2=1182028&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Tue
Oct 11 19:11:38 2011
@@ -64,10 +64,10 @@ private[consumer] class PartitionTopicIn
       // update fetched offset to the compressed data chunk size, not the decompressed message
set size
       if(logger.isTraceEnabled)
         logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
+      chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
       val newOffset = fetchedOffset.addAndGet(size)
       if (logger.isDebugEnabled)
         logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
-      chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
     }
     size
   }



Mime
View raw message