kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric
Date Mon, 16 Apr 2018 17:17:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439737#comment-16439737
] 

ASF GitHub Bot commented on KAFKA-6514:
---------------------------------------

hachikuji closed pull request #4506: KAFKA-6514: Add API version as a tag for the RequestsPerSec
metric
URL: https://github.com/apache/kafka/pull/4506
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 8a17528bfb7..f03bdeb9dd9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -158,7 +158,7 @@ object RequestChannel extends Logging {
       val metricNames = fetchMetricNames :+ header.apiKey.name
       metricNames.foreach { metricName =>
         val m = metrics(metricName)
-        m.requestRate.mark()
+        m.requestRate(header.apiVersion).mark()
         m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
         m.localTimeHist.update(Math.round(apiLocalTimeMs))
         m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
@@ -350,10 +350,11 @@ object RequestMetrics {
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
+
   import RequestMetrics._
 
   val tags = Map("request" -> name)
-  val requestRate = newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
+  val requestRateInternal = new mutable.HashMap[Short, Meter]
   // time a request spent in a request queue
   val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags)
   // time a request takes to be processed at the local broker
@@ -386,6 +387,10 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   private val errorMeters = mutable.Map[Errors, ErrorMeter]()
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
 
+  def requestRate(version: Short): Meter = {
+      requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests",
TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+  }
+
   class ErrorMeter(name: String, error: Errors) {
     private val tags = Map("request" -> name, "error" -> error.name)
 
@@ -418,7 +423,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   }
 
   def removeMetrics(): Unit = {
-    removeMetric(RequestsPerSec, tags)
+    for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version"
-> version.toString))
     removeMetric(RequestQueueTimeMs, tags)
     removeMetric(LocalTimeMs, tags)
     removeMetric(RemoteTimeMs, tags)
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index e6dadbb4253..7d3b42897cc 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -688,10 +688,14 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testRequestMetricsAfterStop(): Unit = {
     server.stopProcessingRequests()
-
-    server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark()
+    val version = ApiKeys.PRODUCE.latestVersion
+    val version2 = (version - 1).toShort
+    for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
+    server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
+    assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
     server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
-    val nonZeroMeters = Map("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce"
-> 1,
+    val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version"
-> 2,
+        s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2"
-> 1,
         "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE"
-> 1)
 
     def requestMetricMeters = YammerMetrics
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 95f2c418c3b..d369d1de79d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,6 +68,12 @@ <h5><a id="upgrade_120_notable" href="#upgrade_120_notable">Notable
changes in 1
 <ul>
     <li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a>
increases the default offset retention time from 1 day to 7 days. This makes it less likely
to "lose" offsets in an application that commits infrequently. It also increases the active
set of offsets and therefore can increase memory usage on the broker. Note that the console
consumer currently enables offset commit by default and can be the source of a large number
of offsets which this change will now preserve for 7 days instead of 1. You can preserve the
existing behavior by setting the broker config <code>offsets.retention.minutes</code>
to 1440.</li>
     <li><a href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a>
extends the lower interval of <code>max.connections.per.ip minimum</code> to zero
and therefore allows IP-based filtering of inbound connections.</li>
+    <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-272%3A+Add+API+version+tag+to+broker%27s+RequestsPerSec+metric">KIP-272</a>
+        added API version tag to the metric <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}</code>.
+        This metric now becomes <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}</code>.
This will impact
+        JMX monitoring tools that do not automatically aggregate. To get the total count
for a specific request type, the tool needs to be
+        updated to aggregate across different versions.
+    </li>
     <li> New Kafka Streams configuration parameter <code>upgrade.from</code>
added that allows rolling bounce upgrade from older version. </li>
 </ul>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add API version as a tag for the RequestsPerSec metric
> ------------------------------------------------------
>
>                 Key: KAFKA-6514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6514
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 1.0.0
>            Reporter: Allen Wang
>            Priority: Major
>
> After we upgrade broker to a new version, one important insight is to see how many clients
have been upgraded so that we can switch the message format when most of the clients have
also been updated to the new version to minimize the performance penalty. 
> RequestsPerSec with the version tag will give us that insight.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message