kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2058: Fix ProducerTest.testSendWithDeadBroker transient failure
Date Mon, 21 Dec 2015 06:01:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8c754c45a -> 3615e4773


KAFKA-2058: Fix ProducerTest.testSendWithDeadBroker transient failure

It turns that waitUntilMetadataIsPropagated is not enough;
in "onBrokerStartup", methods below will send send both LeaderAndIsrRequest and UpdateMetadataRequest
to KafkaApis:
    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
    partitionStateMachine.triggerOnlinePartitionStateChange()
The two kinds of request are handled seperately and we are not sure about the order;
If UpdateMetadataRequest is handled first, metadataCache of kafkaApis will be updated, thus
TestUtils.waitUntilMetadataIsPropagated will be satisfied, and consumer can(will) start fetching
data;
But if the LeaderAndIsrRequest is not handled at this moment, "becomeLeaderOrFollower" cannot
be called , thus structures like "leaderReplicaOpt" cannot be updated, which leads to failure
of consumer's fetching data;
To fix above, consumer should start fetching data after partition's leaderReplica is refreshed,
not just the leader is elected;
So added "TestUtils.waitUntilLeaderIsKnown(servers, topic, 0)"

Author: ZoneMayor <jinxing6042@126.com>
Author: jinxing <jinxing@fenbi.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #689 from ZoneMayor/trunk-KAFKA-2058


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3615e477
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3615e477
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3615e477

Branch: refs/heads/trunk
Commit: 3615e4773dd20327dead494cd0ba6a26edada92d
Parents: 8c754c4
Author: Jin Xing <jinxing6042@126.com>
Authored: Sun Dec 20 22:01:21 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Dec 20 22:01:21 2015 -0800

----------------------------------------------------------------------
 .../src/test/scala/unit/kafka/producer/ProducerTest.scala |  1 +
 core/src/test/scala/unit/kafka/utils/TestUtils.scala      | 10 ++++++++++
 2 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3615e477/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 18a0cd5..79b2603 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -252,6 +252,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
     server1.startup()
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+    TestUtils.waitUntilLeaderIsKnown(servers, topic, 0)
 
     try {
       // cross check if broker 1 got the messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/3615e477/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c04b52c..0221373 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -800,6 +800,16 @@ object TestUtils extends Logging {
     leader
   }
 
+  def waitUntilLeaderIsKnown(servers: Seq[KafkaServer], topic: String, partition: Int, timeout:
Long = 5000L): Unit = {
+    TestUtils.waitUntilTrue(() => 
+      servers.exists { server =>
+        server.replicaManager.getPartition(topic, partition).exists(_.leaderReplicaIfLocal().isDefined)
+      },
+      "Partition [%s,%d] leaders not made yet after %d ms".format(topic, partition, timeout),
+      waitTime = timeout
+    )
+  }
+
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
     val file = new RandomAccessFile(fileName, "rw")
     file.seek(position)


Mime
View raw message