kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1055 BrokerTopicStats should distinguish between messages received and messages actually appended (i.e., not dropped for various reasons).
Date Sat, 25 Jan 2014 02:00:08 GMT
Updated Branches:
  refs/heads/trunk c9028ad8c -> 26a02c32d


KAFKA-1055 BrokerTopicStats should distinguish between messages received and messages actually
appended (i.e., not dropped for various reasons).


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/26a02c32
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/26a02c32
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/26a02c32

Branch: refs/heads/trunk
Commit: 26a02c32dda0222c6ab1ad897992558c1c1eab76
Parents: c9028ad
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Fri Jan 24 18:00:01 2014 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Jan 24 18:00:01 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala           | 5 ++++-
 core/src/main/scala/kafka/server/KafkaRequestHandler.scala | 1 +
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/26a02c32/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 29abc46..bd7940b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -240,6 +240,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data
     trace("Append [%s] to local log ".format(partitionAndData.toString))
     partitionAndData.map {case (topicAndPartition, messages) =>
+      // update stats for incoming bytes rate
       BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
       BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
 
@@ -254,7 +255,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
         val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L)
0 else (info.lastOffset - info.firstOffset + 1)
 
-        // update stats
+        // update stats for successfully appended messages
+        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).logBytesAppendRate.mark(messages.sizeInBytes)
+        BrokerTopicStats.getBrokerAllTopicsStats.logBytesAppendRate.mark(messages.sizeInBytes)
         BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
         BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/26a02c32/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d0f05cb..871212b 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -76,6 +76,7 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
   val messagesInRate = newMeter(name + "MessagesInPerSec",  "messages", TimeUnit.SECONDS)
   val bytesInRate = newMeter(name + "BytesInPerSec",  "bytes", TimeUnit.SECONDS)
   val bytesOutRate = newMeter(name + "BytesOutPerSec",  "bytes", TimeUnit.SECONDS)
+  val logBytesAppendRate = newMeter(name + "LogBytesAppendedPerSec",  "bytes", TimeUnit.SECONDS)
   val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec",  "requests",
TimeUnit.SECONDS)
   val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests",
TimeUnit.SECONDS)
 }


Mime
View raw message