kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4225; Replication Quotas: Control Leader & Follower Limit Separately
Date Fri, 30 Sep 2016 10:23:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 2b431b551 -> 946af8e1f


KAFKA-4225; Replication Quotas: Control Leader & Follower Limit Separately

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1932 from benstopford/KAFKA-4225-over-KAFKA-4216

(cherry picked from commit 0bc1f41fc02062ff224f12ffac053ea3753ecb98)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/946af8e1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/946af8e1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/946af8e1

Branch: refs/heads/0.10.1
Commit: 946af8e1f0dc4af0fed1f6a531ba964941b2b3e6
Parents: 2b431b5
Author: Ben Stopford <benstopford@gmail.com>
Authored: Fri Sep 30 10:06:09 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Sep 30 11:23:27 2016 +0100

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala | 30 +++++++----
 core/src/main/scala/kafka/log/LogConfig.scala   | 12 ++---
 .../main/scala/kafka/server/ConfigHandler.scala | 12 +++--
 .../main/scala/kafka/server/DynamicConfig.scala | 17 +++++--
 .../test/scala/unit/kafka/admin/AdminTest.scala | 13 +++--
 .../admin/ReassignPartitionsCommandTest.scala   | 52 ++++++++++++++++++--
 .../kafka/admin/ReplicationQuotaUtils.scala     | 21 ++++----
 .../unit/kafka/server/DynamicConfigTest.scala   | 11 ++++-
 .../kafka/server/ReplicationQuotasTest.scala    | 10 ++--
 9 files changed, 131 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index dccc37c..06e4120 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -23,8 +23,7 @@ import scala.collection._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.log.LogConfig
-import kafka.log.LogConfig._
-import kafka.utils.CoreUtils._
+import LogConfig._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
 
@@ -79,11 +78,14 @@ object ReassignPartitionsCommand extends Logging {
     var changed = false
 
     //If all partitions have completed remove the throttle
-    if (reassignedPartitionsStatus.forall { case (topicPartition, status) => status ==
ReassignmentCompleted }) {
+    if (reassignedPartitionsStatus.forall { case (_, status) => status == ReassignmentCompleted
}) {
       //Remove the throttle limit from all brokers in the cluster
+      //(as we no longer know which specific brokers were involved in the move)
       for (brokerId <- zkUtils.getAllBrokersInCluster().map(_.id)) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString)
-        if (configs.remove(DynamicConfig.Broker.ThrottledReplicationRateLimitProp) != null){
+        // bitwise OR as we don't want to short-circuit
+        if (configs.remove(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp) != null
+          | configs.remove(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp) !=
null){
           AdminUtils.changeBrokerConfig(zkUtils, Seq(brokerId), configs)
           changed = true
         }
@@ -93,8 +95,9 @@ object ReassignPartitionsCommand extends Logging {
       val topics = partitionsToBeReassigned.keySet.map(tp => tp.topic).toSeq.distinct
       for (topic <- topics) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+        // bitwise OR as we don't want to short-circuit
         if (configs.remove(LogConfig.LeaderThrottledReplicasListProp) != null
-          || configs.remove(LogConfig.FollowerThrottledReplicasListProp) != null){
+          | configs.remove(LogConfig.FollowerThrottledReplicasListProp) != null){
           AdminUtils.changeTopicConfig(zkUtils, topic, configs)
           changed = true
         }
@@ -315,6 +318,10 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment:
Map[TopicA
     }
   }
 
+  /**
+    * Limit the throttle on currently moving replicas. Note that this command can use used
to alter the throttle, but
+    * it may not alter all limits originally set, if some of the brokers have completed their
rebalance.
+    */
   def maybeLimit(throttle: Long) {
     if (throttle >= 0) {
       val existingBrokers = existingAssignment().values.flatten.toSeq
@@ -323,13 +330,15 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment:
Map[TopicA
 
       for (id <- brokers) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
-        configs.put(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, throttle.toString)
+        configs.put(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp, throttle.toString)
+        configs.put(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp, throttle.toString)
         AdminUtils.changeBrokerConfig(zkUtils, Seq(id), configs)
       }
       println(s"The throttle limit was set to $throttle B/s")
     }
   }
 
+  /** Set throttles to replicas that are moving. Note: this method should only be used when
the assignment is initiated. */
   private[admin] def assignThrottledReplicas(allExisting: Map[TopicAndPartition, Seq[Int]],
allProposed: Map[TopicAndPartition, Seq[Int]], admin: AdminUtilities = AdminUtils): Unit =
{
     for (topic <- allProposed.keySet.map(_.topic).toSeq) {
       val (existing, proposed) = filterBy(topic, allExisting, allProposed)
@@ -340,9 +349,10 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment:
Map[TopicA
       //Apply a follower throttle to all "move destinations".
       val follower = format(postRebalanceReplicasThatMoved(existing, proposed))
 
-      admin.changeTopicConfig(zkUtils, topic, propsWith(
-        (LeaderThrottledReplicasListProp, leader),
-        (FollowerThrottledReplicasListProp, follower)))
+      val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+      configs.put(LeaderThrottledReplicasListProp, leader)
+      configs.put(FollowerThrottledReplicasListProp, follower)
+      admin.changeTopicConfig(zkUtils, topic, configs)
 
       debug(s"Updated leader-throttled replicas for topic $topic with: $leader")
       debug(s"Updated follower-throttled replicas for topic $topic with: $follower")
@@ -350,7 +360,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment:
Map[TopicA
   }
 
   private def postRebalanceReplicasThatMoved(existing: Map[TopicAndPartition, Seq[Int]],
proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = {
-    //For each partition in the proposed list, filter out any replicas that exist now (i.e.
not moving)
+    //For each partition in the proposed list, filter out any replicas that exist now (i.e.
are in the proposed list and hence are not moving)
     existing.map { case (tp, current) =>
       tp -> (proposed(tp).toSet -- current).toSeq
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 0a447d4..a934fcd 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -124,8 +124,8 @@ object LogConfig {
   val MessageFormatVersionProp = "message.format.version"
   val MessageTimestampTypeProp = "message.timestamp.type"
   val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms"
-  val LeaderThrottledReplicasListProp = "quota.leader.replication.throttled.replicas"
-  val FollowerThrottledReplicasListProp = "quota.follower.replication.throttled.replicas"
+  val LeaderThrottledReplicasListProp = "leader.replication.throttled.replicas"
+  val FollowerThrottledReplicasListProp = "follower.replication.throttled.replicas"
 
   val SegmentSizeDoc = "This configuration controls the segment file size for " +
     "the log. Retention and cleaning is always done a file at a time so a larger " +
@@ -196,10 +196,10 @@ object LogConfig {
   val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp
when a broker receives " +
     "a message and the timestamp specified in the message. If message.timestamp.type=CreateTime,
a message will be rejected " +
     "if the difference in timestamp exceeds this threshold. This configuration is ignored
if message.timestamp.type=LogAppendTime."
-  val LeaderThrottledReplicasListDoc = "A list of replicas for which log replication should
be throttled on the leader. The list should describe a set of " +
-    "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:..."
-  val FollowerThrottledReplicasListDoc = "A list of replicas for which log replication should
be throttled on the follower. The list should describe a set of " +
-    "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:..."
+  val LeaderThrottledReplicasListDoc = "A list of replicas for which log replication should
be throttled on the leader side. The list should describe a set of " +
+    "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively
the wildcard '*' can be used to throttle all replicas for this topic."
+  val FollowerThrottledReplicasListDoc = "A list of replicas for which log replication should
be throttled on the follower side. The list should describe a set of " +
+    "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively
the wildcard '*' can be used to throttle all replicas for this topic."
 
   private class LogConfigDef extends ConfigDef {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index c6868c7..82e65a2 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -153,11 +153,17 @@ class UserConfigHandler(private val quotaManagers: QuotaManagers) extends
QuotaC
   * This implementation reports the overrides to the respective ReplicationQuotaManager objects
   */
 class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quotaManagers:
QuotaManagers) extends ConfigHandler with Logging {
+
   def processConfigChanges(brokerId: String, properties: Properties) {
+    def getOrDefault(prop: String): Long = {
+      if (properties.containsKey(prop))
+        properties.getProperty(prop).toLong
+      else
+        DefaultThrottledReplicationRate
+    }
     if (brokerConfig.brokerId == brokerId.trim.toInt) {
-      val limit = if (properties.containsKey(ThrottledReplicationRateLimitProp)) properties.getProperty(ThrottledReplicationRateLimitProp).toLong
else DefaultThrottledReplicationRateLimit
-      quotaManagers.leader.updateQuota(upperBound(limit))
-      quotaManagers.follower.updateQuota(upperBound(limit))
+      quotaManagers.leader.updateQuota(upperBound(getOrDefault(ThrottledLeaderReplicationRateProp)))
+      quotaManagers.follower.updateQuota(upperBound(getOrDefault(ThrottledFollowerReplicationRateProp)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 51e9818..8a62af8 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.util.Properties
+import kafka.log.LogConfig
 import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.config.ConfigDef.Importance._
 import org.apache.kafka.common.config.ConfigDef.Range._
@@ -32,19 +33,25 @@ object DynamicConfig {
 
   object Broker {
     //Properties
-    val ThrottledReplicationRateLimitProp = "replication.quota.throttled.rate"
+    val ThrottledLeaderReplicationRateProp = "leader.replication.throttled.rate"
+    val ThrottledFollowerReplicationRateProp = "follower.replication.throttled.rate"
 
     //Defaults
-    val DefaultThrottledReplicationRateLimit = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
+    val DefaultThrottledReplicationRate = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
 
     //Documentation
-    val ThrottledReplicationRateLimitDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for replicas enumerated in the " +
-      s"property $ThrottledReplicationRateLimitProp. This property can be only set dynamically
via the config command etc. The minimum value is 1 KB/s."
+    val ThrottledLeaderReplicationRateDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for leaders enumerated in the " +
+      s"property ${LogConfig.LeaderThrottledReplicasListProp} (for each topic). This property
can be only set dynamically. It is suggested that the " +
+      s"limit be kept above 1MB/s for accurate behaviour."
+    val ThrottledFollowerReplicationRateDoc = "A long representing the upper bound (bytes/sec)
on replication traffic for followers enumerated in the " +
+      s"property ${LogConfig.FollowerThrottledReplicasListProp} (for each topic). This property
can be only set dynamically. It is suggested that the " +
+      s"limit be kept above 1MB/s for accurate behaviour."
 
     //Definitions
     private val brokerConfigDef = new ConfigDef()
       //round minimum value down, to make it easier for users.
-      .define(ThrottledReplicationRateLimitProp, LONG, DefaultThrottledReplicationRateLimit,
atLeast(1000), MEDIUM, ThrottledReplicationRateLimitDoc)
+      .define(ThrottledLeaderReplicationRateProp, LONG, DefaultThrottledReplicationRate,
atLeast(0), MEDIUM, ThrottledLeaderReplicationRateDoc)
+      .define(ThrottledFollowerReplicationRateProp, LONG, DefaultThrottledReplicationRate,
atLeast(0), MEDIUM, ThrottledFollowerReplicationRateDoc)
 
     def names = brokerConfigDef.names
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 609e9a2..ee980e2 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -477,23 +477,28 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
       val limit: Long = 1000000
 
       // Set the limit & check it is applied to the log
-      changeBrokerConfig(servers(0).zkUtils, brokerIds,  propsWith(ThrottledReplicationRateLimitProp,
limit.toString))
+      changeBrokerConfig(zkUtils, brokerIds, propsWith(
+        (ThrottledLeaderReplicationRateProp, limit.toString),
+        (ThrottledFollowerReplicationRateProp, limit.toString)))
       checkConfig(limit)
 
       // Now double the config values for the topic and check that it is applied
       val newLimit = 2 * limit
-      changeBrokerConfig(servers(0).zkUtils, brokerIds,  propsWith(ThrottledReplicationRateLimitProp,
newLimit.toString))
+      changeBrokerConfig(zkUtils, brokerIds,  propsWith(
+        (ThrottledLeaderReplicationRateProp, newLimit.toString),
+        (ThrottledFollowerReplicationRateProp, newLimit.toString)))
       checkConfig(newLimit)
 
       // Verify that the same config can be read from ZK
       for (brokerId <- brokerIds) {
         val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker,
brokerId.toString)
-        assertEquals(newLimit, configInZk.getProperty(ThrottledReplicationRateLimitProp).toInt)
+        assertEquals(newLimit, configInZk.getProperty(ThrottledLeaderReplicationRateProp).toInt)
+        assertEquals(newLimit, configInZk.getProperty(ThrottledFollowerReplicationRateProp).toInt)
       }
 
       //Now delete the config
       changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
-      checkConfig(DefaultThrottledReplicationRateLimit)
+      checkConfig(DefaultThrottledReplicationRate)
 
     } finally {
       servers.foreach(_.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index f66dbed..0e96795 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -20,12 +20,13 @@ import java.util.Properties
 
 import kafka.common.TopicAndPartition
 import kafka.log.LogConfig._
-import kafka.utils.{Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
-import org.junit.Test
+import org.junit.{Before, Test}
 import org.junit.Assert.assertEquals
 
 class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
+  var calls = 0
 
   @Test
   def testRackAwareReassign() {
@@ -67,10 +68,12 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
         assertEquals("0:102", configChange.get(FollowerThrottledReplicasListProp)) //Should
only be follower-throttle the moving replica
         assertEquals("0:100,0:101", configChange.get(LeaderThrottledReplicasListProp)) //Should
leader-throttle all existing (pre move) replicas
+        calls += 1
       }
     }
 
     assigner.assignThrottledReplicas(existing, proposed, mock)
+    assertEquals(1, calls)
   }
 
   @Test
@@ -87,11 +90,13 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
         assertEquals("0:102,1:102", configChange.get(FollowerThrottledReplicasListProp))
//Should only be follower-throttle the moving replica
         assertEquals("0:100,0:101,1:100,1:101", configChange.get(LeaderThrottledReplicasListProp))
//Should leader-throttle all existing (pre move) replicas
+        calls += 1
       }
     }
 
     //When
     assigner.assignThrottledReplicas(existing, proposed, mock)
+    assertEquals(1, calls)
   }
 
   @Test
@@ -108,18 +113,20 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
         topic match {
           case "topic1" =>
-            assertEquals("0:102", configChange.get(FollowerThrottledReplicasListProp))
             assertEquals("0:100,0:101", configChange.get(LeaderThrottledReplicasListProp))
+            assertEquals("0:102", configChange.get(FollowerThrottledReplicasListProp))
           case "topic2" =>
-            assertEquals("0:100", configChange.get(FollowerThrottledReplicasListProp))
             assertEquals("0:101,0:102", configChange.get(LeaderThrottledReplicasListProp))
+            assertEquals("0:100", configChange.get(FollowerThrottledReplicasListProp))
           case _ => fail("Unexpected topic $topic")
         }
+        calls += 1
       }
     }
 
     //When
     assigner.assignThrottledReplicas(existing, proposed, mock)
+    assertEquals(2, calls)
   }
 
   @Test
@@ -152,14 +159,15 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
             assertEquals("0:101,0:102,1:101,1:102", configChange.get(LeaderThrottledReplicasListProp))
           case _ => fail()
         }
+        calls += 1
       }
     }
 
     //When
     assigner.assignThrottledReplicas(existing, proposed, mock)
+    assertEquals(2, calls)
   }
 
-
   @Test
   def shouldFindTwoMovingReplicasInSamePartition() {
     val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
@@ -174,10 +182,44 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with
Logging wi
       override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties)
= {
         assertEquals("0:104,0:105", configChange.get(FollowerThrottledReplicasListProp))
//Should only be follower-throttle the moving replicas
         assertEquals("0:100,0:101,0:102,0:103", configChange.get(LeaderThrottledReplicasListProp))
//Should leader-throttle all existing (pre move) replicas
+        calls += 1
       }
     }
 
     //When
     assigner.assignThrottledReplicas(existing, proposed, mock)
+    assertEquals(1, calls)
+  }
+
+  @Test
+  def shouldNotOverwriteEntityConfigsWhenUpdatingThrottledReplicas(): Unit = {
+    val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
+    val assigner = new ReassignPartitionsCommand(null, null)
+    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), control)
+    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), control)
+
+    //Given partition there are existing properties
+    val existingProperties = CoreUtils.propsWith("some-key", "some-value")
+
+    //Then the dummy property should still be there
+    val mock = new TestAdminUtils {
+      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
+        assertEquals("some-value", configChange.getProperty("some-key"))
+        calls += 1
+      }
+
+      override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String):
Properties = {
+        existingProperties
+      }
+    }
+
+    //When
+    assigner.assignThrottledReplicas(existing, proposed, mock)
+    assertEquals(1, calls)
+  }
+
+  @Before
+  def setup(): Unit = {
+    calls = 0
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
index 004067b..dc50f61 100644
--- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
@@ -13,7 +13,7 @@
 package kafka.admin
 
 import kafka.log.LogConfig
-import kafka.server.{DynamicConfig, KafkaConfig, ConfigType, KafkaServer}
+import kafka.server.{DynamicConfig, ConfigType, KafkaServer}
 import kafka.utils.TestUtils
 
 import scala.collection.Seq
@@ -22,14 +22,15 @@ object ReplicationQuotaUtils {
 
   def checkThrottleConfigRemovedFromZK(topic: String, servers: Seq[KafkaServer]): Boolean
= {
     TestUtils.waitUntilTrue(() => {
-      val brokerReset = servers.forall { server =>
+      val hasRateProp = servers.forall { server =>
         val brokerConfig = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker,
server.config.brokerId.toString)
-        !brokerConfig.contains(DynamicConfig.Broker.ThrottledReplicationRateLimitProp)
+        brokerConfig.contains(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp) ||
+          brokerConfig.contains(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp)
       }
       val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic,
topic)
-      val topicReset = !(topicConfig.contains(LogConfig.LeaderThrottledReplicasListProp)
-        || topicConfig.contains(LogConfig.FollowerThrottledReplicasListProp))
-      brokerReset && topicReset
+      val hasReplicasProp = topicConfig.contains(LogConfig.LeaderThrottledReplicasListProp)
||
+        topicConfig.contains(LogConfig.FollowerThrottledReplicasListProp)
+      !hasRateProp && !hasReplicasProp
     }, "Throttle limit/replicas was not unset")
   }
 
@@ -38,8 +39,10 @@ object ReplicationQuotaUtils {
       //Check for limit in ZK
       val brokerConfigAvailable = servers.forall { server =>
         val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker,
server.config.brokerId.toString)
-        val zkThrottleRate = configInZk.getProperty(DynamicConfig.Broker.ThrottledReplicationRateLimitProp)
-        zkThrottleRate != null && expectedThrottleRate == zkThrottleRate.toLong
+        val zkLeaderRate = configInZk.getProperty(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp)
+        val zkFollowerRate = configInZk.getProperty(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp)
+        zkLeaderRate != null && expectedThrottleRate == zkLeaderRate.toLong &&
+          zkFollowerRate != null && expectedThrottleRate == zkFollowerRate.toLong
       }
       //Check replicas assigned
       val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic,
topic)
@@ -49,4 +52,4 @@ object ReplicationQuotaUtils {
       brokerConfigAvailable && topicConfigAvailable
     }, "throttle limit/replicas was not set")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index c0fc08b..c481dc7 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -52,7 +52,14 @@ class DynamicConfigTest {
   }
 
   @Test(expected = classOf[ConfigException])
-  def shouldFailConfigsWithInvalidValues() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), propsWith(DynamicConfig.Broker.ThrottledReplicationRateLimitProp,
"-100"))
+  def shouldFailLeaderConfigsWithInvalidValues() {
+    AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+      propsWith(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp, "-100"))
+  }
+
+  @Test(expected = classOf[ConfigException])
+  def shouldFailFollowerConfigsWithInvalidValues() {
+    AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+      propsWith(DynamicConfig.Broker.ThrottledFollowerReplicationRateProp, "-100"))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/946af8e1/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 3fc6f7d..9107067 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -105,7 +105,11 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Set the throttle limit on all 8 brokers, but only assign throttled replicas to the
six leaders, or two followers
     (100 to 107).foreach { brokerId =>
-      changeBrokerConfig(zkUtils, Seq(brokerId), propsWith(DynamicConfig.Broker.ThrottledReplicationRateLimitProp,
throttle.toString))
+      changeBrokerConfig(zkUtils, Seq(brokerId),
+        propsWith(
+          (DynamicConfig.Broker.ThrottledLeaderReplicationRateProp, throttle.toString),
+          (DynamicConfig.Broker.ThrottledFollowerReplicationRateProp, throttle.toString)
+        ))
     }
 
     //Either throttle the six leaders or the two followers
@@ -188,8 +192,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val expectedDuration = 4
     val throttle: Long = msg.length * msgCount / expectedDuration
 
-    //Set the throttle limit leader
-    changeBrokerConfig(zkUtils, Seq(100), propsWith(DynamicConfig.Broker.ThrottledReplicationRateLimitProp,
throttle.toString))
+    //Set the throttle to only limit leader
+    changeBrokerConfig(zkUtils, Seq(100), propsWith(DynamicConfig.Broker.ThrottledLeaderReplicationRateProp,
throttle.toString))
     changeTopicConfig(zkUtils, topic, propsWith(LeaderThrottledReplicasListProp, "0:100"))
 
     //Add data


Mime
View raw message