kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
Date Tue, 03 Nov 2020 14:00:08 GMT

splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r516301423



##########
File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -240,6 +256,16 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
       s"Admin client connection not closed (initial = $initialConnectionCount, current =
$connectionCount)")
   }
 
+  private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): Unit = {
+    adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default),
+      CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, updatedRate.toString))
+    // use a random throwaway address if ip isn't specified to get the default value
+    TestUtils.waitUntilTrue(() => servers.head.socketServer.connectionQuotas.
+      connectionRateForIp(InetAddress.getByName(ip.getOrElse("255.255.3.4"))) == updatedRate,

Review comment:
       This is admittedly a little weird.
   If `None` is given as the IP, we want to update the default connection rate. 
   To verify that the default connection rate was updated, we need to call `connectionRateForIp`
with some arbitrary IP address that hasn't been given a specific override. In this case, I
used an arbitrary IP, `255.255.3.4` as mentioned in the comment.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics)
extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = {
     // if there is a connection waiting on the rate throttle delay, we will let it wait the
original delay even if
     // the rate limit increases, because it is just one connection per listener and the code
is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs for updated
IPs.
+   * If an IP is given, metric config will be updated only for the given IP, otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit =
{
+    def isIpConnectionRateMetric(metricName: MetricName) = {
+      metricName.name == ConnectionRateMetricName &&
+      metricName.group == MetricsGroup &&
+      metricName.tags.containsKey(IpMetricTag)
+    }
+
+    def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+      quotaLimit != metric.config.quota.bound
+    }
+
+    ip match {
+      case Some(addr) =>
+        val address = InetAddress.getByName(addr)
+        maxConnectionRate match {
+          case Some(rate) =>
+            info(s"Updating max connection rate override for $address to $rate")
+            connectionRatePerIp.put(address, rate)
+          case None =>
+            info(s"Removing max connection rate override for $address")
+            connectionRatePerIp.remove(address)
+        }
+        updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address))
+      case None =>
+        val newQuota = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+        info(s"Updating default max IP connection rate to $newQuota")
+        defaultConnectionRatePerIp = newQuota
+        val allMetrics = metrics.metrics
+        allMetrics.forEach { (metricName, metric) =>
+          if (isIpConnectionRateMetric(metricName) && shouldUpdateQuota(metric, newQuota))
{
+            info(s"Updating existing connection rate sensor for ${metricName.tags} to $newQuota")
+            metric.config(rateQuotaMetricConfig(newQuota))
+          }

Review comment:
       @dajac 
   good catch, we shouldn't be using newQuota here. I agree, this should have been covered
by testing, I'll work on adding some more unit tests for the metric config updating.
   
   @apovzner 
   It should handle that case, yeah.
   If default is set to some value, when we remove quota for an ip, e.g. `updateConnectionRate(Some(ip),
None)`, we remove the connection rate entry from the map and then call `getOrDefault(ip, defaultConnectionRatePerIp)`which
should be whatever the non-unlimited per-IP default quota is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message