kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1293720 - /incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
Date Sat, 25 Feb 2012 23:08:02 GMT
Author: junrao
Date: Sat Feb 25 23:08:02 2012
New Revision: 1293720

URL: http://svn.apache.org/viewvc?rev=1293720&view=rev
Log:
add jmx beans in broker to track # of failed requests; patched by Jun Rao; reviewed by Neha
Narkhede; KAFKA-283

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1293720&r1=1293719&r2=1293720&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Sat
Feb 25 23:08:02 2012
@@ -75,6 +75,8 @@ private[kafka] class KafkaRequestHandler
     catch {
       case e =>
         error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition,
e)
+        BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
+        BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
         throw e
     }
   }
@@ -113,6 +115,8 @@ private[kafka] class KafkaRequestHandler
     catch {
       case e =>
         error("error when processing request " + fetchRequest, e)
+        BrokerTopicStat.getBrokerTopicStat(fetchRequest.topic).recordFailedFetchRequest
+        BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
         response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     }
     response
@@ -132,6 +136,8 @@ trait BrokerTopicStatMBean {
   def getMessagesIn: Long
   def getBytesIn: Long
   def getBytesOut: Long
+  def getFailedProduceRequest: Long
+  def getFailedFetchRequest: Long
 }
 
 @threadsafe
@@ -139,6 +145,8 @@ class BrokerTopicStat extends BrokerTopi
   private val numCumulatedMessagesIn = new AtomicLong(0)
   private val numCumulatedBytesIn = new AtomicLong(0)
   private val numCumulatedBytesOut = new AtomicLong(0)
+  private val numCumulatedFailedProduceRequests = new AtomicLong(0)
+  private val numCumulatedFailedFetchRequests = new AtomicLong(0)
 
   def getMessagesIn: Long = numCumulatedMessagesIn.get
 
@@ -151,6 +159,14 @@ class BrokerTopicStat extends BrokerTopi
   def getBytesOut: Long = numCumulatedBytesOut.get
 
   def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
+
+  def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement
+
+  def getFailedProduceRequest = numCumulatedFailedProduceRequests.get()
+
+  def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement
+
+  def getFailedFetchRequest = numCumulatedFailedFetchRequests.get()
 }
 
 object BrokerTopicStat extends Logging {



Mime
View raw message