kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sriram...@apache.org
Subject git commit: KAFKA-930 leader auto rebalance feature; reviewed by Neha Narkhede, Jun Rao and Guozhang Wang
Date Fri, 20 Dec 2013 22:04:54 GMT
Updated Branches:
  refs/heads/trunk b5d16871c -> b23cf1968


KAFKA-930 leader auto rebalance feature; reviewed by Neha Narkhede, Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b23cf196
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b23cf196
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b23cf196

Branch: refs/heads/trunk
Commit: b23cf1968f2aa4f8dbd6629634ae813b5e14564c
Parents: b5d1687
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Fri Dec 20 14:04:23 2013 -0800
Committer: Sriram Subramanian <sriram.sub@gmail.com>
Committed: Fri Dec 20 14:04:23 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 92 +++++++++++++++++++-
 .../main/scala/kafka/server/KafkaConfig.scala   | 12 +++
 2 files changed, 102 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b23cf196/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 965d0e5..ca2f09b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -21,14 +21,14 @@ import collection.immutable.Set
 import com.yammer.metrics.core.Gauge
 import java.lang.{IllegalStateException, Object}
 import java.util.concurrent.TimeUnit
-import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.admin.{AdminOperationException, PreferredReplicaLeaderElectionCommand}
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, ZkUtils, Logging}
+import kafka.utils.{Json, Utils, ZkUtils, Logging, KafkaScheduler}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
@@ -112,6 +112,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
onControllerFailover,
     config.brokerId)
+  // have a separate scheduler for the controller to be able to start and stop independently
of the
+  // kafka server
+  private val autoRebalanceScheduler = new KafkaScheduler(1)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -140,6 +143,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     }
   )
 
+  newGauge(
+    "PreferredReplicaImbalanceCount",
+    new Gauge[Int] {
+      def value(): Int = {
+        controllerContext.controllerLock synchronized {
+          if (!isActive())
+            0
+          else
+            controllerContext.partitionReplicaAssignment.count {
+              case (topicPartition, replicas) => controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
!= replicas.head
+            }
+        }
+      }
+    }
+  )
+
   def epoch = controllerContext.epoch
 
   def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
@@ -250,6 +269,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
       initializeAndMaybeTriggerPreferredReplicaElection()
       /* send partition leadership info to all live brokers */
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+      if (config.autoLeaderRebalanceEnable) {
+        info("starting the partition rebalance scheduler")
+        autoRebalanceScheduler.startup()
+        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
+          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
+      }
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -500,6 +525,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
       isRunning = false
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
+      if (config.autoLeaderRebalanceEnable)
+        autoRebalanceScheduler.shutdown()
       if(controllerContext.controllerChannelManager != null) {
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null
@@ -906,6 +933,67 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
       }
     }
   }
+
+  private def checkAndTriggerPartitionRebalance(): Unit = {
+    if (isActive()) {
+      trace("checking need to trigger partition rebalance")
+      // get all the active brokers
+      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]]
= null
+      controllerContext.controllerLock synchronized {
+        preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy
{
+          case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+        }
+      }
+      debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+      // for each broker, check if a preferred replica election needs to be triggered
+      preferredReplicasForTopicsByBrokers.foreach {
+        case(leaderBroker, topicAndPartitionsForBroker) => {
+          var imbalanceRatio: Double = 0
+          var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
+          controllerContext.controllerLock synchronized {
+            topicsNotInPreferredReplica =
+              topicAndPartitionsForBroker.filter {
+                case(topicPartition, replicas) => {
+                  controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
!= leaderBroker
+                }
+              }
+            debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+            val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+            trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
+          }
+          // check ratio and if greater than desired ratio, trigger a rebalance for the topic
partitions
+          // that need to be on this broker
+          if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100))
{
+            controllerContext.controllerLock synchronized {
+              // do this check only if the broker is live and there are no partitions being
reassigned currently
+              // and preferred replica election is not in progress
+              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                  controllerContext.partitionsBeingReassigned.size == 0 &&
+                  controllerContext.partitionsUndergoingPreferredReplicaElection.size ==
0) {
+                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+                val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic"
-> e.topic, "partition" -> e.partition))
+                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+                try {
+                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+                  info("Created preferred replica election path with %s".format(jsonData))
+                } catch {
+                  case e2: ZkNodeExistsException =>
+                    val partitionsUndergoingPreferredReplicaElection =
+                      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient,
zkPath)._1)
+                    error("Preferred replica leader election currently in progress for "
+
+                          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
+                  case e3: Throwable =>
+                    error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b23cf196/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a7e5b73..3c3aafc 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -220,6 +220,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the purge interval (in number of requests) of the producer request purgatory */
   val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests",
10000)
 
+  /* Enables auto leader balancing. A background thread checks and triggers leader
+   * balance if required at regular intervals */
+  val autoLeaderRebalanceEnable = props.getBoolean("auto.leader.rebalance.enable", false)
+
+  /* the ratio of leader imbalance allowed per broker. The controller would trigger a leader
balance if it goes above
+   * this value per broker. The value is specified in percentage. */
+  val leaderImbalancePerBrokerPercentage = props.getInt("leader.imbalance.per.broker.percentage",
10)
+
+  /* the frequency with which the partition rebalance check is triggered by the controller
*/
+  val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds",
300)
+
+
   /*********** Controlled shutdown configuration ***********/
 
   /** Controlled shutdown can fail for multiple reasons. This determines the number of retries
when such failure happens */


Mime
View raw message