Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F3A51200B84 for ; Tue, 20 Sep 2016 16:39:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F2130160AC0; Tue, 20 Sep 2016 14:39:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1CC8B160AC5 for ; Tue, 20 Sep 2016 16:39:26 +0200 (CEST) Received: (qmail 6496 invoked by uid 500); 20 Sep 2016 14:39:26 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 6487 invoked by uid 99); 20 Sep 2016 14:39:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Sep 2016 14:39:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27ACAE0019; Tue, 20 Sep 2016 14:39:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-4184; Intermittent failures in ReplicationQuotasTest.shouldBootstrapTwoBrokersWithFollowerThrottle Date: Tue, 20 Sep 2016 14:39:26 +0000 (UTC) archived-at: Tue, 20 Sep 2016 14:39:28 -0000 Repository: kafka Updated Branches: refs/heads/0.10.1 fc5f48aad -> d48415f18 KAFKA-4184; Intermittent failures in ReplicationQuotasTest.shouldBootstrapTwoBrokersWithFollowerThrottle Build is unstable, so it's hard to validate this change. Of the various builds up until 11am BST the test ran twice and passed twice. Author: Ben Stopford Reviewers: Ismael Juma Closes #1873 from benstopford/KAFKA-4184 (cherry picked from commit 3663275cf066b7715cc11b26fd9c144bbff1c373) Signed-off-by: Ismael Juma Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d48415f1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d48415f1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d48415f1 Branch: refs/heads/0.10.1 Commit: d48415f185d1882f0a3b89a3ce03ea84893393ba Parents: fc5f48a Author: Ben Stopford Authored: Tue Sep 20 14:53:48 2016 +0100 Committer: Ismael Juma Committed: Tue Sep 20 15:39:17 2016 +0100 ---------------------------------------------------------------------- .../server/ReplicationQuotaManagerTest.scala | 3 +- .../kafka/server/ReplicationQuotasTest.scala | 93 ++++++++++---------- 2 files changed, 49 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d48415f1/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala index 5c41372..3616b7b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package unit.kafka.server +package kafka.server import java.util.Collections import kafka.common.TopicAndPartition import kafka.server.QuotaType._ -import kafka.server.{QuotaType, ReplicationQuotaManager, ReplicationQuotaManagerConfig} import org.apache.kafka.common.metrics.{Quota, MetricConfig, Metrics} import org.apache.kafka.common.utils.MockTime import org.junit.Assert.{assertFalse, assertTrue, assertEquals} http://git-wip-us.apache.org/repos/asf/kafka/blob/d48415f1/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index af7c4c8..88b9b89 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package unit.kafka.server +package kafka.server import java.util.Properties @@ -25,7 +25,6 @@ import kafka.common._ import kafka.log.LogConfig._ import kafka.server.KafkaConfig.fromProps import kafka.server.QuotaType._ -import kafka.server._ import kafka.utils.TestUtils import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness @@ -85,8 +84,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { brokers = (100 to 105).map { id => TestUtils.createServer(fromProps(createBrokerConfig(id, zkConnect))) } - //Given six partitions, lead on nodes 0,1,2,3,4,5 but will followers on node 6,7 (not started yet) - //And two extra partitions 6,7, which we don't intend on throttling + //Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not started yet) + //And two extra partitions 6,7, which we don't intend on throttling. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map( 0 -> Seq(100, 106), //Throttled 1 -> Seq(101, 106), //Throttled @@ -99,7 +98,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { )) val msg = msg100KB - val msgCount: Int = 1000 + val msgCount = 100 val expectedDuration = 10 //Keep the test to N seconds var throttle: Long = msgCount * msg.length / expectedDuration if (!leaderThrottle) throttle = throttle * 3 //Follower throttle needs to replicate 3x as fast to get the same duration as there are three replicas to replicate for each of the two follower brokers @@ -108,16 +107,16 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { (100 to 107).foreach { brokerId => changeBrokerConfig(zkUtils, Seq(brokerId), property(KafkaConfig.ThrottledReplicationRateLimitProp, throttle.toString)) } - if (leaderThrottle) - changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:100,1:101,2:102,3:103,4:104,5:105")) //partition-broker:... throttle the 6 leaders - else - changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:106,1:106,2:106,3:107,4:107,5:107")) //partition-broker:... throttle the two followers + + //Either throttle the six leaders or the two followers + val throttledReplicas = if (leaderThrottle) "0:100,1:101,2:102,3:103,4:104,5:105" else "0:106,1:106,2:106,3:107,4:107,5:107" + changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, throttledReplicas)) //Add data equally to each partition producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers), retries = 5, acks = 0) (0 until msgCount).foreach { x => (0 to 7).foreach { partition => - producer.send(new ProducerRecord(topic, partition, null, msg)).get + producer.send(new ProducerRecord(topic, partition, null, msg)) } } @@ -130,20 +129,15 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { val start = System.currentTimeMillis() //When we create the 2 new, empty brokers - brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(106, zkConnect))) - brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(107, zkConnect))) + createBrokers(106 to 107) //Check that throttled config correctly migrated to the new brokers (106 to 107).foreach { brokerId => assertEquals(throttle, brokerFor(brokerId).quotaManagers.follower.upperBound()) } if (!leaderThrottle) { - (0 to 2).foreach { partition => - assertTrue(brokerFor(106).quotaManagers.follower.isThrottled(new TopicAndPartition(topic, partition))) - } - (3 to 5).foreach { partition => - assertTrue(brokerFor(107).quotaManagers.follower.isThrottled(new TopicAndPartition(topic, partition))) - } + (0 to 2).foreach { partition => assertTrue(brokerFor(106).quotaManagers.follower.isThrottled(tp(partition))) } + (3 to 5).foreach { partition => assertTrue(brokerFor(107).quotaManagers.follower.isThrottled(tp(partition))) } } //Wait for non-throttled partitions to replicate first @@ -156,33 +150,24 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { val throttledTook = System.currentTimeMillis() - start - //Check the recorded throttled rate is what we expect - if (leaderThrottle) { - (100 to 105).map(brokerFor(_)).foreach { broker => - val metricName = broker.metrics.metricName("byte-rate", LeaderReplication.toString, "Tracking byte-rate for" + LeaderReplication) - val measuredRate = broker.metrics.metrics.asScala(metricName).value() - info(s"Broker:${broker.config.brokerId} Expected:$throttle, Recorded Rate was:$measuredRate") - assertEquals(throttle, measuredRate, percentError(25, throttle)) - } - } else { - (106 to 107).map(brokerFor(_)).foreach { broker => - val metricName = broker.metrics.metricName("byte-rate", FollowerReplication.toString, "Tracking byte-rate for" + FollowerReplication) - val measuredRate = broker.metrics.metrics.asScala(metricName).value() - info(s"Broker:${broker.config.brokerId} Expected:$throttle, Recorded Rate was:$measuredRate") - assertEquals(throttle, measuredRate, percentError(25, throttle)) - } - } - //Check the times for throttled/unthrottled are each side of what we expect - info(s"Unthrottled took: $unthrottledTook, Throttled took: $throttledTook, for expeted $expectedDuration secs") - assertTrue(s"Unthrottled replication of ${unthrottledTook}ms should be < ${expectedDuration * 1000}ms", - unthrottledTook < expectedDuration * 1000) - assertTrue((s"Throttled replication of ${throttledTook}ms should be > ${expectedDuration * 1000}ms"), - throttledTook > expectedDuration * 1000) - assertTrue((s"Throttled replication of ${throttledTook}ms should be < ${expectedDuration * 1500}ms"), - throttledTook < expectedDuration * 1000 * 1.5) + val throttledLowerBound = expectedDuration * 1000 * 0.9 + val throttledUpperBound = expectedDuration * 1000 * 3 + assertTrue(s"Expected $unthrottledTook < $throttledLowerBound", unthrottledTook < throttledLowerBound) + assertTrue(s"Expected $throttledTook > $throttledLowerBound", throttledTook > throttledLowerBound) + assertTrue(s"Expected $throttledTook < $throttledUpperBound", throttledTook < throttledUpperBound) + + // Check the rate metric matches what we expect. + // In a short test the brokers can be read unfairly, so assert against the average + val rateUpperBound = throttle * 1.1 + val rateLowerBound = throttle * 0.5 + val rate = if (leaderThrottle) avRate(LeaderReplication, 100 to 105) else avRate(FollowerReplication, 106 to 107) + assertTrue(s"Expected ${rate} < $rateUpperBound", rate < rateUpperBound) + assertTrue(s"Expected ${rate} > $rateLowerBound", rate > rateLowerBound) } + def tp(partition: Int): TopicAndPartition = new TopicAndPartition(topic, partition) + @Test def shouldThrottleOldSegments(): Unit = { /** @@ -230,13 +215,31 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness { private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: Int): Boolean = { waitUntilTrue(() => { - offset == brokerFor(brokerId).getLogManager.getLog(TopicAndPartition(topic, partitionId)).map(_.logEndOffset).getOrElse(0) + offset == brokerFor(brokerId).getLogManager.getLog(TopicAndPartition(topic, partitionId)) + .map(_.logEndOffset).getOrElse(0) }, s"Offsets did not match for partition $partitionId on broker $brokerId", 60000) } private def property(key: String, value: String) = { - new Properties() { put(key, value) } + val props = new Properties() + props.put(key, value) + props + } + + private def brokerFor(id: Int): KafkaServer = brokers.filter(_.config.brokerId == id).head + + def createBrokers(brokerIds: Seq[Int]): Unit = { + brokerIds.foreach { id => + brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(id, zkConnect))) + } + } + + private def avRate(replicationType: QuotaType, brokers: Seq[Int]): Double = { + brokers.map(brokerFor).map(measuredRate(_, replicationType)).sum / brokers.length } - private def brokerFor(id: Int): KafkaServer = brokers.filter(_.config.brokerId == id)(0) + private def measuredRate(broker: KafkaServer, repType: QuotaType): Double = { + val metricName = broker.metrics.metricName("byte-rate", repType.toString) + broker.metrics.metrics.asScala(metricName).value + } }