kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-861; IndexOutOfBoundsException while fetching data from leader; patched by Sriram Subramanian; reviewed by Jun Rao and Neha Narkhede
Date Tue, 16 Apr 2013 01:10:09 GMT
Updated Branches:
  refs/heads/0.8 158baf661 -> b29b6296a


kafka-861; IndexOutOfBoundsException while fetching data from leader; patched by Sriram Subramanian;
reviewed by Jun Rao and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b29b6296
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b29b6296
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b29b6296

Branch: refs/heads/0.8
Commit: b29b6296ac5b799a78d61920a3f4131051230a02
Parents: 158baf6
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Mon Apr 15 18:09:57 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Apr 15 18:09:57 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/message/ByteBufferMessageSet.scala |    2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   11 ++++++++---
 2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b29b6296/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 03590ad..078ebb4 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -154,7 +154,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends
Message
           return allDone()
         val offset = topIter.getLong()
         val size = topIter.getInt()
-        if(size < 0)
+        if(size < Message.MinHeaderSize)
           throw new InvalidMessageException("Message found with corrupt size (" + size +
")")
         
         // we have an incomplete message

http://git-wip-us.apache.org/repos/asf/kafka/blob/b29b6296/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b6845e4..fed3b86 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,8 +19,7 @@ package kafka.server
 
 import kafka.cluster.Broker
 import collection.mutable
-import kafka.message.ByteBufferMessageSet
-import kafka.message.MessageAndOffset
+import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
@@ -131,8 +130,14 @@ abstract class AbstractFetcherThread(name: String, clientId: String,
sourceBroke
                     // Once we hand off the partition data to the subclass, we can't mess
with it any more in this thread
                     processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                   } catch {
+                    case ime: InvalidMessageException =>
+                      // we log the error and continue. This ensures two things
+                      // 1. If there is a corrupt message in a topic partition, it does not
bring the fetcher thread down and cause other topic partition to also lag
+                      // 2. If the message is corrupt due to a transient state in the log
(truncation, partial writes can cause this), we simply continue and
+                      //    should get fixed in the subsequent fetches
+                      logger.warn("Found invalid messages during fetch for partition [" +
topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
                     case e =>
-                      throw new KafkaException("error processing data for topic %s partititon
%d offset %d"
+                      throw new KafkaException("error processing data for partition [%s,%d]
offset %d"
                                                .format(topic, partitionId, currentOffset.get),
e)
                   }
                 case ErrorMapping.OffsetOutOfRangeCode =>


Mime
View raw message