kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4441; Monitoring incorrect during topic creation and deletion
Date Mon, 06 Feb 2017 20:23:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 f4276b7cf -> 13a09557a


KAFKA-4441; Monitoring incorrect during topic creation and deletion

OfflinePartitionsCount PreferredReplicaImbalanceCount metrics check for
topic being deleted

Added integration test which polls the metrics while topics are being
created and deleted

Developed with mimaison

Author: Edoardo Comar <ecomar@uk.ibm.com>

Reviewers: Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jun
Rao <junrao@gmail.com>

Closes #2325 from edoardocomar/KAFKA-4441

(cherry picked from commit 48aec6ef1e8065bd14e54172d6443144fb80738b)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13a09557
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13a09557
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13a09557

Branch: refs/heads/0.10.2
Commit: 13a09557a4949cb795bc2dec07a2ae488fd273cb
Parents: f4276b7
Author: Edoardo Comar <ecomar@uk.ibm.com>
Authored: Mon Feb 6 12:23:24 2017 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Feb 6 12:23:49 2017 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Replica.scala |   5 +-
 .../kafka/controller/KafkaController.scala      |  10 +-
 ...MetricsDuringTopicCreationDeletionTest.scala | 161 +++++++++++++++++++
 3 files changed, 172 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/13a09557/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 346e5d6..8597b06 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -68,9 +68,9 @@ class Replica(val brokerId: Int,
    */
   def updateLogReadResult(logReadResult : LogReadResult) {
     if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
-      _lastCaughtUpTimeMs = logReadResult.fetchTimeMs
+      _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)
     else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
-      _lastCaughtUpTimeMs = lastFetchTimeMs
+      _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
 
     logEndOffset = logReadResult.info.fetchOffsetMetadata
     lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
@@ -130,6 +130,7 @@ class Replica(val brokerId: Int,
     replicaString.append("; Topic: " + partition.topic)
     replicaString.append("; Partition: " + partition.partitionId)
     replicaString.append("; isLocal: " + isLocal)
+    replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
     if (isLocal) replicaString.append("; Highwatermark: " + highWatermark)
     replicaString.toString
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/13a09557/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e38adf8..774316b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -188,7 +188,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
           if (!isActive)
             0
           else
-            controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader))
+            controllerContext.partitionLeadershipInfo.count(p => 
+              (!controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader))
+              && (!deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic))
+            )
         }
       }
     }
@@ -203,7 +206,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
             0
           else
             controllerContext.partitionReplicaAssignment.count {
-              case (topicPartition, replicas) => controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
!= replicas.head
+              case (topicPartition, replicas) => 
+                (controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
!= replicas.head 
+                && (!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic))
+                )
             }
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/13a09557/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
new file mode 100644
index 0000000..19a0f9d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import java.util.Properties
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, TestUtils}
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+import org.junit.{Before, Test}
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Gauge
+
+
+class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with Logging
{
+
+  private val nodesNum = 3
+  private val topicName = "topic"
+  private val topicNum = 2
+  private val replicationFactor = 3
+  private val partitionNum = 3
+  private val createDeleteIterations = 3
+  
+  private val overridingProps = new Properties
+  overridingProps.put(KafkaConfig.DeleteTopicEnableProp, "true")
+  overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+  // speed up the test for UnderReplicatedPartitions 
+  // which relies on the ISR expiry thread to execute concurrently with topic creation
+  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "2000") 
+
+  private val testedMetrics = List("OfflinePartitionsCount","PreferredReplicaImbalanceCount","UnderReplicatedPartitions")
+  private val topics = List.tabulate(topicNum) (n => topicName + n)
+
+  @volatile private var running = true
+  
+  override def generateConfigs() = TestUtils.createBrokerConfigs(nodesNum, zkConnect)
+    .map(KafkaConfig.fromProps(_, overridingProps))
+
+  @Before
+  override def setUp {
+    // Do some Metrics Registry cleanup by removing the metrics that this test checks. 
+    // This is a test workaround to the issue that prior harness runs may have left a populated
registry.
+    // see https://issues.apache.org/jira/browse/KAFKA-4605
+    for (m <- (testedMetrics)) {
+        Metrics.defaultRegistry.allMetrics.asScala
+        .filterKeys(k => k.getName.endsWith(m))
+        .headOption match {
+           case Some(e) => Metrics.defaultRegistry.removeMetric(e._1)
+           case None =>
+        }
+    }
+    
+    super.setUp
+  }
+
+  /*
+   * checking all metrics we care in a single test is faster though it would be more elegant
to have 3 @Test methods
+   */
+  @Test
+  def testMetricsDuringTopicCreateDelete() {
+
+    // For UnderReplicatedPartitions, because of https://issues.apache.org/jira/browse/KAFKA-4605
+    // we can't access the metrics value of each server. So instead we directly invoke the
method 
+    // replicaManager.underReplicatedPartitionCount() that defines the metrics value.
+    @volatile var underReplicatedPartitionCount = 0
+
+    // For OfflinePartitionsCount and PreferredReplicaImbalanceCount even with https://issues.apache.org/jira/browse/KAFKA-4605
+    // the test has worked reliably because the metric that gets triggered is the one generated
by the first started server (controller)
+    val offlinePartitionsCountGauge = getGauge("OfflinePartitionsCount")
+    @volatile var offlinePartitionsCount = offlinePartitionsCountGauge.value
+    assert(offlinePartitionsCount == 0)
+
+    val preferredReplicaImbalanceCountGauge = getGauge("PreferredReplicaImbalanceCount")
+    @volatile var preferredReplicaImbalanceCount = preferredReplicaImbalanceCountGauge.value
+    assert(preferredReplicaImbalanceCount == 0)
+
+    // Thread checking the metric continuously
+    running = true
+    val thread = new Thread(new Runnable {
+      def run() {
+        while (running) {
+          for ( s <- servers if running) {
+            underReplicatedPartitionCount = s.replicaManager.underReplicatedPartitionCount
+            if (underReplicatedPartitionCount > 0) {
+              running = false
+            }
+          }
+
+          preferredReplicaImbalanceCount = preferredReplicaImbalanceCountGauge.value
+          if (preferredReplicaImbalanceCount > 0) {
+             running = false
+          }
+
+          offlinePartitionsCount = offlinePartitionsCountGauge.value
+          if (offlinePartitionsCount > 0) {
+             running = false
+          }
+        }
+      }
+    })
+    thread.start
+
+    // breakable loop that creates and deletes topics
+    createDeleteTopics()
+
+    // if the thread checking the gauge is still run, stop it
+    running = false;
+    thread.join
+    
+    assert(offlinePartitionsCount==0, "OfflinePartitionCount not 0: "+ offlinePartitionsCount)
+    assert(preferredReplicaImbalanceCount==0, "PreferredReplicaImbalanceCount not 0: " +
preferredReplicaImbalanceCount)
+    assert(underReplicatedPartitionCount==0, "UnderReplicatedPartitionCount not 0: " + underReplicatedPartitionCount)
+  }
+
+  private def getGauge(metricName: String) = {
+    Metrics.defaultRegistry.allMetrics.asScala
+           .filterKeys(k => k.getName.endsWith(metricName))
+           .headOption
+           .getOrElse { fail( "Unable to find metric " + metricName ) }
+           ._2.asInstanceOf[Gauge[Int]]
+  }
+  
+  private def createDeleteTopics() {
+    for (l <- 1 to createDeleteIterations if running) {
+      // Create topics
+      for (t <- topics if running) {
+        try {
+          kafka.admin.AdminUtils.createTopic(zkUtils, t, partitionNum, replicationFactor)
+        } catch {
+          case e: Exception => e.printStackTrace
+        }
+      }
+      Thread.sleep(500)
+
+      // Delete topics
+      for (t <- topics if running) {
+          try {
+              kafka.admin.AdminUtils.deleteTopic(zkUtils, t)
+          } catch {
+          case e: Exception => e.printStackTrace
+          }
+      }
+      Thread.sleep(500)
+    }
+  }
+}


Mime
View raw message