kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6753) Speed up event processing on the controller
Date Fri, 06 Apr 2018 02:32:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16427865#comment-16427865
] 

ASF GitHub Bot commented on KAFKA-6753:
---------------------------------------

gitlw closed pull request #4831: KAFKA-6753: Update controller metrics periodically instead
of after processing every event
URL: https://github.com/apache/kafka/pull/4831
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
index 17af7775901..b2b0e9e5fc2 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -90,7 +90,11 @@ object ControllerState {
     def value = 12
   }
 
+  case object UpdateMetrics extends ControllerState {
+    def value = 13
+  }
+
   val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange,
TopicDeletion,
     PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange,
LeaderAndIsrResponseReceived,
-    LogDirChange, ControllerShutdown)
+    LogDirChange, ControllerShutdown, UpdateMetrics)
 }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4778a7a6e89..a64003491dd 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -70,7 +70,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
 
   // visible for testing
   private[controller] val eventManager = new ControllerEventManager(config.brokerId,
-    controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())
+    controllerContext.stats.rateAndTimeMetrics, _ => updateMetricsPending = true)
 
   val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
@@ -93,6 +93,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
   @volatile private var preferredReplicaImbalanceCount = 0
   @volatile private var globalTopicCount = 0
   @volatile private var globalPartitionCount = 0
+  @volatile private var updateMetricsPending = false
 
   /* single-thread scheduler to clean expired tokens */
   private val tokenCleanScheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "delegation-token-cleaner")
@@ -255,6 +256,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     if (config.autoLeaderRebalanceEnable) {
       scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
     }
+    scheduleUpdateMetricsTask(delay = 5, unit = TimeUnit.SECONDS)
 
     if (config.tokenAuthEnabled) {
       info("starting the token expiry check scheduler")
@@ -266,6 +268,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     }
   }
 
+  private def scheduleUpdateMetricsTask(delay: Long, unit: TimeUnit): Unit = {
+    kafkaScheduler.schedule("controller-update-metrics-task", () => eventManager.put(UpdateMetricsTask),
+      delay = delay, unit = unit)
+  }
+
   private def scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit = {
     kafkaScheduler.schedule("auto-leader-rebalance-task", () => eventManager.put(AutoPreferredReplicaLeaderElection),
       delay = delay, unit = unit)
@@ -994,6 +1001,19 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     }
   }
 
+  case object UpdateMetricsTask extends ControllerEvent {
+    def state = ControllerState.UpdateMetrics
+
+    override def process(): Unit = {
+      if (!isActive) return
+      try {
+        maybeUpdateMetrics()
+      } finally {
+        scheduleUpdateMetricsTask(delay = config.controllerUpdateMetricsIntervalMs, unit
= TimeUnit.MILLISECONDS)
+      }
+    }
+  }
+
   case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
 
     def state = ControllerState.AutoLeaderBalance
@@ -1130,7 +1150,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
 
   }
 
-  private def updateMetrics(): Unit = {
+  private def maybeUpdateMetrics(): Unit = {
+    if (!updateMetricsPending)
+      return
+    updateMetricsPending = false
+    
     offlinePartitionCount =
       if (!isActive) {
         0
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5a1dca395bb..8e90ef62f33 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -137,6 +137,7 @@ object Defaults {
   val AutoLeaderRebalanceEnable = true
   val LeaderImbalancePerBrokerPercentage = 10
   val LeaderImbalanceCheckIntervalSeconds = 300
+  val ControllerUpdateMetricsIntervalMs = 1000L
   val UncleanLeaderElectionEnable = false
   val InterBrokerSecurityProtocol = SecurityProtocol.PLAINTEXT.toString
   val InterBrokerProtocolVersion = ApiVersion.latestVersion.toString
@@ -344,6 +345,7 @@ object KafkaConfig {
   val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable"
   val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage"
   val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds"
+  val ControllerUpdateMetricsIntervalMsProp = "controller.update.metrics.interval.ms"
   val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
   val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
   val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
@@ -615,6 +617,7 @@ object KafkaConfig {
   val AutoLeaderRebalanceEnableDoc = "Enables auto leader balancing. A background thread
checks and triggers leader balance if required at regular intervals"
   val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per
broker. The controller would trigger a leader balance if it goes above this value per broker.
The value is specified in percentage."
   val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance
check is triggered by the controller"
+  val ControllerUpdateMetricsIntervalMsDoc = "The frequency with which the metrics on controller
are updated"
   val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR
set to be elected as leader as a last resort, even though doing so may result in data loss"
   val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers.
Valid values are: " +
     s"${SecurityProtocol.names.asScala.mkString(", ")}. It is an error to set this and $InterBrokerListenerNameProp
" +
@@ -855,6 +858,7 @@ object KafkaConfig {
       .define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable,
HIGH, AutoLeaderRebalanceEnableDoc)
       .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage,
HIGH, LeaderImbalancePerBrokerPercentageDoc)
       .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds,
HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
+      .define(ControllerUpdateMetricsIntervalMsProp, LONG, Defaults.ControllerUpdateMetricsIntervalMs,
HIGH, ControllerUpdateMetricsIntervalMsDoc)
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
HIGH, UncleanLeaderElectionEnableDoc)
       .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol,
MEDIUM, InterBrokerSecurityProtocolDoc)
       .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion,
MEDIUM, InterBrokerProtocolVersionDoc)
@@ -1119,6 +1123,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
   val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
+  val controllerUpdateMetricsIntervalMs = getLong(KafkaConfig.ControllerUpdateMetricsIntervalMsProp)
   def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
 
   // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different
version (eg if `0.10.0`
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 0213c12814e..e33320c959e 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -635,6 +635,7 @@ class KafkaConfigTest {
         case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_boolean", "0")
         case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
+        case KafkaConfig.ControllerUpdateMetricsIntervalMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_boolean", "0")
         case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Speed up event processing on the controller 
> --------------------------------------------
>
>                 Key: KAFKA-6753
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6753
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Lucas Wang
>            Assignee: Lucas Wang
>            Priority: Minor
>         Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. This can slow
down event processing on the controller tremendously. In one profiling we see that updating
metrics takes nearly 100% of the CPU for the controller event processing thread. Specifically
the slowness can be attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to calculate
the offline partitions count requires iterating through all the partitions in the cluster
to check if the partition is offline; and calculating the preferred replica imbalance count
requires iterating through all the partitions in the cluster to check if a partition has a
leader other than the preferred leader. In a large cluster, the number of partitions can be
quite large, all seen by the controller. Even if the time spent to check a single partition
is small, the accumulation effect of so many partitions in the cluster can make the invocation
to update metrics quite expensive. One might argue that maybe the logic for processing each
single partition is not optimized, we checked the CPU percentage of leaf nodes in the profiling
result, and found that inside the loops of collection objects, e.g. the set of all partitions,
no single function dominates the processing. Hence the large number of the partitions in a
cluster is the main contributor to the slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high number of
events to be processed by the controller, one invocation after processing any event.
> The patch that will be submitted tries to fix bullet 2 above, i.e. reducing the number
of invocations to update metrics. Instead of updating the metrics after processing any event,
we only periodically check if the metrics needs to be updated, i.e. once every second. 
> * If after the previous invocation to update metrics, there are other types of events
that changed the controller’s state, then one second later the metrics will be updated.

> * If after the previous invocation, there has been no other types of events, then the
call to update metrics can be bypassed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message