kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: System Test Transient Failure on testcase_0122; patched by Sriram Subramanian; reviewed by Jun Rao and Neha Narkhede; kafka-772
Date Tue, 05 Mar 2013 06:40:41 GMT
Updated Branches:
  refs/heads/0.8 92ecebecd -> 0ee46e05d


System Test Transient Failure on testcase_0122; patched by Sriram Subramanian; reviewed by
Jun Rao and Neha Narkhede; kafka-772


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ee46e05
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ee46e05
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ee46e05

Branch: refs/heads/0.8
Commit: 0ee46e05ddcbf1d6bc2aa0039800aaeac860ad56
Parents: 92ecebe
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Mon Mar 4 22:40:12 2013 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Mar 4 22:40:12 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/AbstractFetcherThread.scala |    3 ++-
 1 files changed, 2 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0ee46e05/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index a7d39b1..4ee23cd 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -114,7 +114,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
           case(topicAndPartition, partitionData) =>
             val (topic, partitionId) = topicAndPartition.asTuple
             val currentOffset = partitionMap.get(topicAndPartition)
-            if (currentOffset.isDefined) {
+            // we append to the log if the current offset is defined and it is the same as
the offset requested during fetch
+            if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset
== currentOffset.get) {
               partitionData.error match {
                 case ErrorMapping.NoError =>
                   val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]


Mime
View raw message