From jira-return-55958-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Nov 3 14:00:10 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id A9B2D1806C7 for ; Tue, 3 Nov 2020 15:00:09 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id ED8FD44931 for ; Tue, 3 Nov 2020 14:00:08 +0000 (UTC) Received: (qmail 80838 invoked by uid 500); 3 Nov 2020 14:00:08 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 80735 invoked by uid 99); 3 Nov 2020 14:00:08 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Nov 2020 14:00:08 +0000 From: =?utf-8?q?GitBox?= To: jira@kafka.apache.org Subject: =?utf-8?q?=5BGitHub=5D_=5Bkafka=5D_splett2_commented_on_a_change_in_pull_req?= =?utf-8?q?uest_=239386=3A_KAFKA-10024=3A_Add_dynamic_configuration_and_enfo?= =?utf-8?q?rce_quota_for_per-IP_connection_rate_limits_=28KIP-612=2C_part_2?= =?utf-8?q?=29?= Message-ID: <160441200835.15632.16824475089181749283.asfpy@gitbox.apache.org> Date: Tue, 03 Nov 2020 14:00:08 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit References: In-Reply-To: splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r516301423 ########## File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala ########## @@ -240,6 +256,16 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { s"Admin client connection not closed (initial = $initialConnectionCount, current = $connectionCount)") } + private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): Unit = { + adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default), + CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, updatedRate.toString)) + // use a random throwaway address if ip isn't specified to get the default value + TestUtils.waitUntilTrue(() => servers.head.socketServer.connectionQuotas. + connectionRateForIp(InetAddress.getByName(ip.getOrElse("255.255.3.4"))) == updatedRate, Review comment: This is admittedly a little weird. If `None` is given as the IP, we want to update the default connection rate. To verify that the default connection rate was updated, we need to call `connectionRateForIp` with some arbitrary IP address that hasn't been given a specific override. In this case, I used an arbitrary IP, `255.255.3.4` as mentioned in the comment. ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way - updateConnectionRateQuota(maxConnectionRate) + updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { + def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) + } + + def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound + } + + ip match { + case Some(addr) => + val address = InetAddress.getByName(addr) + maxConnectionRate match { + case Some(rate) => + info(s"Updating max connection rate override for $address to $rate") + connectionRatePerIp.put(address, rate) + case None => + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) + } + updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => + val newQuota = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate) + info(s"Updating default max IP connection rate to $newQuota") + defaultConnectionRatePerIp = newQuota + val allMetrics = metrics.metrics + allMetrics.forEach { (metricName, metric) => + if (isIpConnectionRateMetric(metricName) && shouldUpdateQuota(metric, newQuota)) { + info(s"Updating existing connection rate sensor for ${metricName.tags} to $newQuota") + metric.config(rateQuotaMetricConfig(newQuota)) + } Review comment: @dajac good catch, we shouldn't be using newQuota here. I agree, this should have been covered by testing, I'll work on adding some more unit tests for the metric config updating. @apovzner It should handle that case, yeah. If default is set to some value, when we remove quota for an ip, e.g. `updateConnectionRate(Some(ip), None)`, we remove the connection rate entry from the map and then call `getOrDefault(ip, defaultConnectionRatePerIp)`which should be whatever the non-unlimited per-IP default quota is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org