kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5738; Add cumulative count for rate metrics (KIP-187)
Date Thu, 14 Sep 2017 23:07:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk dfd625daa -> 8a5e86660


KAFKA-5738; Add cumulative count for rate metrics (KIP-187)

Implementation of https://cwiki.apache.org/confluence/display/KAFKA/KIP-187+-+Add+cumulative+count+metric+for+all+Kafka+rate+metrics

Also made locking in Sensor for `CompoundStat` consistent with simple `Stat`, avoiding locking the whole sensor.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3686 from rajinisivaram/KAFKA-5738


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8a5e8666
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8a5e8666
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8a5e8666

Branch: refs/heads/trunk
Commit: 8a5e86660593eab49c64fdfb5ef090634ae5ae06
Parents: dfd625d
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Fri Sep 15 00:06:08 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Sep 15 00:07:17 2017 +0100

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 23 +++---
 .../consumer/internals/ConsumerCoordinator.java |  6 +-
 .../clients/consumer/internals/Fetcher.java     | 19 +++--
 .../internals/FetcherMetricsRegistry.java       | 20 +++++
 .../clients/producer/internals/BufferPool.java  |  9 +-
 .../producer/internals/RecordAccumulator.java   |  7 +-
 .../clients/producer/internals/Sender.java      | 44 ++++++----
 .../internals/SenderMetricsRegistry.java        | 16 ++++
 .../org/apache/kafka/common/metrics/Sensor.java |  3 +-
 .../kafka/common/metrics/stats/Meter.java       | 82 ++++++++++++++++++
 .../apache/kafka/common/network/Selector.java   | 87 ++++++++++++--------
 .../producer/internals/BufferPoolTest.java      |  6 +-
 .../kafka/common/metrics/MetricsTest.java       | 20 +++--
 .../kafka/common/metrics/stats/MeterTest.java   | 73 ++++++++++++++++
 .../main/scala/kafka/network/SocketServer.scala |  5 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 51 +++++++++++-
 .../processor/internals/StreamThread.java       | 26 ++++--
 .../processor/internals/StreamsMetricsImpl.java | 13 ++-
 .../apache/kafka/streams/KafkaStreamsTest.java  |  4 +-
 .../internals/StreamsMetricsImplTest.java       | 11 ++-
 20 files changed, 416 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index dcf837b..bf75242 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -33,7 +33,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
@@ -813,6 +813,14 @@ public abstract class AbstractCoordinator implements Closeable {
 
     }
 
+    protected Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) {
+        return new Meter(new Count(),
+                metrics.metricName(baseName + "-rate", groupName,
+                        String.format("The number of %s per second", descriptiveName)),
+                metrics.metricName(baseName + "-total", groupName,
+                        String.format("The total number of %s", descriptiveName)));
+    }
+
     private class GroupCoordinatorMetrics {
         public final String metricGrpName;
 
@@ -827,9 +835,7 @@ public abstract class AbstractCoordinator implements Closeable {
             this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max",
                 this.metricGrpName,
                 "The max time taken to receive a response to a heartbeat request"), new Max());
-            this.heartbeatLatency.add(metrics.metricName("heartbeat-rate",
-                this.metricGrpName,
-                "The average number of heartbeats per second"), new Rate(new Count()));
+            this.heartbeatLatency.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats"));
 
             this.joinLatency = metrics.sensor("join-latency");
             this.joinLatency.add(metrics.metricName("join-time-avg",
@@ -838,9 +844,8 @@ public abstract class AbstractCoordinator implements Closeable {
             this.joinLatency.add(metrics.metricName("join-time-max",
                     this.metricGrpName,
                     "The max time taken for a group rejoin"), new Max());
-            this.joinLatency.add(metrics.metricName("join-rate",
-                    this.metricGrpName,
-                    "The number of group joins per second"), new Rate(new Count()));
+            this.joinLatency.add(createMeter(metrics, metricGrpName, "join", "group joins"));
+
 
             this.syncLatency = metrics.sensor("sync-latency");
             this.syncLatency.add(metrics.metricName("sync-time-avg",
@@ -849,9 +854,7 @@ public abstract class AbstractCoordinator implements Closeable {
             this.syncLatency.add(metrics.metricName("sync-time-max",
                     this.metricGrpName,
                     "The max time taken for a group sync"), new Max());
-            this.syncLatency.add(metrics.metricName("sync-rate",
-                    this.metricGrpName,
-                    "The number of group syncs per second"), new Rate(new Count()));
+            this.syncLatency.add(createMeter(metrics, metricGrpName, "sync", "group syncs"));
 
             Measurable lastHeartbeat =
                 new Measurable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3a80d06..e740ba7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -38,9 +38,7 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -891,9 +889,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             this.commitLatency.add(metrics.metricName("commit-latency-max",
                 this.metricGrpName,
                 "The max time taken for a commit request"), new Max());
-            this.commitLatency.add(metrics.metricName("commit-rate",
-                this.metricGrpName,
-                "The number of commit calls per second"), new Rate(new Count()));
+            this.commitLatency.add(createMeter(metrics, metricGrpName, "commit", "commit calls"));
 
             Measurable numParts =
                 new Measurable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 01db34f..4c68f1f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -43,7 +43,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.BufferSupplier;
@@ -1260,16 +1260,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             this.bytesFetched = metrics.sensor("bytes-fetched");
             this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg());
             this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new Max());
-            this.bytesFetched.add(metrics.metricInstance(metricsRegistry.bytesConsumedRate), new Rate());
+            this.bytesFetched.add(new Meter(metrics.metricInstance(metricsRegistry.bytesConsumedRate),
+                    metrics.metricInstance(metricsRegistry.bytesConsumedTotal)));
 
             this.recordsFetched = metrics.sensor("records-fetched");
             this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg), new Avg());
-            this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsConsumedRate), new Rate());
+            this.recordsFetched.add(new Meter(metrics.metricInstance(metricsRegistry.recordsConsumedRate),
+                    metrics.metricInstance(metricsRegistry.recordsConsumedTotal)));
 
             this.fetchLatency = metrics.sensor("fetch-latency");
             this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg());
             this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max());
-            this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchRequestRate), new Rate(new Count()));
+            this.fetchLatency.add(new Meter(new Count(), metrics.metricInstance(metricsRegistry.fetchRequestRate),
+                    metrics.metricInstance(metricsRegistry.fetchRequestTotal)));
 
             this.recordsFetchLag = metrics.sensor("records-lag");
             this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
@@ -1287,8 +1290,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                         metricTags), new Avg());
                 bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeMax,
                         metricTags), new Max());
-                bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicBytesConsumedRate,
-                        metricTags), new Rate());
+                bytesFetched.add(new Meter(this.metrics.metricInstance(metricsRegistry.topicBytesConsumedRate, metricTags),
+                        this.metrics.metricInstance(metricsRegistry.topicBytesConsumedTotal, metricTags)));
             }
             bytesFetched.record(bytes);
 
@@ -1302,8 +1305,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 recordsFetched = this.metrics.sensor(name);
                 recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsPerRequestAvg,
                         metricTags), new Avg());
-                recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedRate,
-                        metricTags), new Rate());
+                recordsFetched.add(new Meter(this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedRate, metricTags),
+                        this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedTotal, metricTags)));
             }
             recordsFetched.record(records);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
index 0c98342..acf42ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -28,19 +28,24 @@ public class FetcherMetricsRegistry {
     public MetricNameTemplate fetchSizeAvg;
     public MetricNameTemplate fetchSizeMax;
     public MetricNameTemplate bytesConsumedRate;
+    public MetricNameTemplate bytesConsumedTotal;
     public MetricNameTemplate recordsPerRequestAvg;
     public MetricNameTemplate recordsConsumedRate;
+    public MetricNameTemplate recordsConsumedTotal;
     public MetricNameTemplate fetchLatencyAvg;
     public MetricNameTemplate fetchLatencyMax;
     public MetricNameTemplate fetchRequestRate;
+    public MetricNameTemplate fetchRequestTotal;
     public MetricNameTemplate recordsLagMax;
     public MetricNameTemplate fetchThrottleTimeAvg;
     public MetricNameTemplate fetchThrottleTimeMax;
     public MetricNameTemplate topicFetchSizeAvg;
     public MetricNameTemplate topicFetchSizeMax;
     public MetricNameTemplate topicBytesConsumedRate;
+    public MetricNameTemplate topicBytesConsumedTotal;
     public MetricNameTemplate topicRecordsPerRequestAvg;
     public MetricNameTemplate topicRecordsConsumedRate;
+    public MetricNameTemplate topicRecordsConsumedTotal;
     public MetricNameTemplate partitionRecordsLag;
     public MetricNameTemplate partitionRecordsLagMax;
     public MetricNameTemplate partitionRecordsLagAvg;
@@ -65,11 +70,15 @@ public class FetcherMetricsRegistry {
                 "The maximum number of bytes fetched per request", tags);
         this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, 
                 "The average number of bytes consumed per second", tags);
+        this.bytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName,
+                "The total number of bytes consumed", tags);
 
         this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, 
                 "The average number of records in each request", tags);
         this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, 
                 "The average number of records consumed per second", tags);
+        this.recordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
+                "The total number of records consumed", tags);
 
         this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName, 
                 "The average time taken for a fetch request.", tags);
@@ -77,6 +86,8 @@ public class FetcherMetricsRegistry {
                 "The max time taken for any fetch request.", tags);
         this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName, 
                 "The number of fetch requests per second.", tags);
+        this.fetchRequestTotal = new MetricNameTemplate("fetch-total", groupName,
+                "The total number of fetch requests.", tags);
 
         this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, 
                 "The maximum lag in terms of number of records for any partition in this window", tags);
@@ -96,11 +107,15 @@ public class FetcherMetricsRegistry {
                 "The maximum number of bytes fetched per request for a topic", topicTags);
         this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, 
                 "The average number of bytes consumed per second for a topic", topicTags);
+        this.topicBytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName,
+                "The total number of bytes consumed for a topic", topicTags);
 
         this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, 
                 "The average number of records in each request for a topic", topicTags);
         this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, 
                 "The average number of records consumed per second for a topic", topicTags);
+        this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
+                "The total number of records consumed for a topic", topicTags);
         
         /***** Partition level *****/
         this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, 
@@ -118,19 +133,24 @@ public class FetcherMetricsRegistry {
             fetchSizeAvg,
             fetchSizeMax,
             bytesConsumedRate,
+            bytesConsumedTotal,
             recordsPerRequestAvg,
             recordsConsumedRate,
+            recordsConsumedTotal,
             fetchLatencyAvg,
             fetchLatencyMax,
             fetchRequestRate,
+            fetchRequestTotal,
             recordsLagMax,
             fetchThrottleTimeAvg,
             fetchThrottleTimeMax,
             topicFetchSizeAvg,
             topicFetchSizeMax,
             topicBytesConsumedRate,
+            topicBytesConsumedTotal,
             topicRecordsPerRequestAvg,
             topicRecordsConsumedRate,
+            topicRecordsConsumedTotal,
             partitionRecordsLag,
             partitionRecordsLagAvg,
             partitionRecordsLagMax

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 019201f..c5df2da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.utils.Time;
 
 
@@ -75,10 +75,13 @@ public class BufferPool {
         this.metrics = metrics;
         this.time = time;
         this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
-        MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
+        MetricName rateMetricName = metrics.metricName("bufferpool-wait-ratio",
                                                    metricGrpName,
                                                    "The fraction of time an appender waits for space allocation.");
-        this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
+        MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total",
+                                                   metricGrpName,
+                                                   "The total time an appender waits for space allocation.");
+        this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 38b5e51..72d3b29 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
@@ -159,8 +159,9 @@ public final class RecordAccumulator {
         metrics.addMetric(metricName, availableBytes);
 
         Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
-        metricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
-        bufferExhaustedRecordSensor.add(metricName, new Rate());
+        MetricName rateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
+        MetricName totalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion");
+        bufferExhaustedRecordSensor.add(new Meter(rateMetricName, totalMetricName));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8da411c..466bdd5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -43,7 +43,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.RecordBatch;
@@ -725,18 +725,21 @@ public class Sender implements Runnable {
             this.requestTimeSensor.add(m, new Max());
 
             this.recordsPerRequestSensor = metrics.sensor("records-per-request");
-            m = metrics.metricInstance(metricsRegistry.recordSendRate);
-            this.recordsPerRequestSensor.add(m, new Rate());
+            MetricName rateMetricName = metrics.metricInstance(metricsRegistry.recordSendRate);
+            MetricName totalMetricName = metrics.metricInstance(metricsRegistry.recordSendTotal);
+            this.recordsPerRequestSensor.add(new Meter(rateMetricName, totalMetricName));
             m = metrics.metricInstance(metricsRegistry.recordsPerRequestAvg);
             this.recordsPerRequestSensor.add(m, new Avg());
 
             this.retrySensor = metrics.sensor("record-retries");
-            m = metrics.metricInstance(metricsRegistry.recordRetryRate);
-            this.retrySensor.add(m, new Rate());
+            rateMetricName = metrics.metricInstance(metricsRegistry.recordRetryRate);
+            totalMetricName = metrics.metricInstance(metricsRegistry.recordRetryTotal);
+            this.retrySensor.add(new Meter(rateMetricName, totalMetricName));
 
             this.errorSensor = metrics.sensor("errors");
-            m = metrics.metricInstance(metricsRegistry.recordErrorRate);
-            this.errorSensor.add(m, new Rate());
+            rateMetricName = metrics.metricInstance(metricsRegistry.recordErrorRate);
+            totalMetricName = metrics.metricInstance(metricsRegistry.recordErrorTotal);
+            this.errorSensor.add(new Meter(rateMetricName, totalMetricName));
 
             this.maxRecordSizeSensor = metrics.sensor("record-size");
             m = metrics.metricInstance(metricsRegistry.recordSizeMax);
@@ -758,8 +761,9 @@ public class Sender implements Runnable {
             });
 
             this.batchSplitSensor = metrics.sensor("batch-split-rate");
-            m = metrics.metricInstance(metricsRegistry.batchSplitRate);
-            this.batchSplitSensor.add(m, new Rate());
+            rateMetricName = metrics.metricInstance(metricsRegistry.batchSplitRate);
+            totalMetricName = metrics.metricInstance(metricsRegistry.batchSplitTotal);
+            this.batchSplitSensor.add(new Meter(rateMetricName, totalMetricName));
         }
 
         private void maybeRegisterTopicMetrics(String topic) {
@@ -771,28 +775,32 @@ public class Sender implements Runnable {
                 Map<String, String> metricTags = Collections.singletonMap("topic", topic);
 
                 topicRecordCount = this.metrics.sensor(topicRecordsCountName);
-                MetricName m = this.metrics.metricInstance(metricsRegistry.topicRecordSendRate, metricTags);
-                topicRecordCount.add(m, new Rate());
+                MetricName rateMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordSendRate, metricTags);
+                MetricName totalMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordSendTotal, metricTags);
+                topicRecordCount.add(new Meter(rateMetricName, totalMetricName));
 
                 String topicByteRateName = "topic." + topic + ".bytes";
                 Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
-                m = this.metrics.metricInstance(metricsRegistry.topicByteRate, metricTags);
-                topicByteRate.add(m, new Rate());
+                rateMetricName = this.metrics.metricInstance(metricsRegistry.topicByteRate, metricTags);
+                totalMetricName = this.metrics.metricInstance(metricsRegistry.topicByteTotal, metricTags);
+                topicByteRate.add(new Meter(rateMetricName, totalMetricName));
 
                 String topicCompressionRateName = "topic." + topic + ".compression-rate";
                 Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
-                m = this.metrics.metricInstance(metricsRegistry.topicCompressionRate, metricTags);
+                MetricName m = this.metrics.metricInstance(metricsRegistry.topicCompressionRate, metricTags);
                 topicCompressionRate.add(m, new Avg());
 
                 String topicRetryName = "topic." + topic + ".record-retries";
                 Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
-                m = this.metrics.metricInstance(metricsRegistry.topicRecordRetryRate, metricTags);
-                topicRetrySensor.add(m, new Rate());
+                rateMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordRetryRate, metricTags);
+                totalMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordRetryTotal, metricTags);
+                topicRetrySensor.add(new Meter(rateMetricName, totalMetricName));
 
                 String topicErrorName = "topic." + topic + ".record-errors";
                 Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
-                m = this.metrics.metricInstance(metricsRegistry.topicRecordErrorRate, metricTags);
-                topicErrorSensor.add(m, new Rate());
+                rateMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordErrorRate, metricTags);
+                totalMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordErrorTotal, metricTags);
+                topicErrorSensor.add(new Meter(rateMetricName, totalMetricName));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
index f29d319..9e014a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
@@ -38,19 +38,27 @@ public class SenderMetricsRegistry {
     public MetricNameTemplate produceThrottleTimeAvg;
     public MetricNameTemplate produceThrottleTimeMax;
     public MetricNameTemplate recordSendRate;
+    public MetricNameTemplate recordSendTotal;
     public MetricNameTemplate recordsPerRequestAvg;
     public MetricNameTemplate recordRetryRate;
+    public MetricNameTemplate recordRetryTotal;
     public MetricNameTemplate recordErrorRate;
+    public MetricNameTemplate recordErrorTotal;
     public MetricNameTemplate recordSizeMax;
     public MetricNameTemplate recordSizeAvg;
     public MetricNameTemplate requestsInFlight;
     public MetricNameTemplate metadataAge;
     public MetricNameTemplate topicRecordSendRate;
+    public MetricNameTemplate topicRecordSendTotal;
     public MetricNameTemplate topicByteRate;
+    public MetricNameTemplate topicByteTotal;
     public MetricNameTemplate topicCompressionRate;
     public MetricNameTemplate topicRecordRetryRate;
+    public MetricNameTemplate topicRecordRetryTotal;
     public MetricNameTemplate topicRecordErrorRate;
+    public MetricNameTemplate topicRecordErrorTotal;
     public MetricNameTemplate batchSplitRate;
+    public MetricNameTemplate batchSplitTotal;
 
     public SenderMetricsRegistry() {
         this(new HashSet<String>());
@@ -68,14 +76,18 @@ public class SenderMetricsRegistry {
         this.requestLatencyAvg = new MetricNameTemplate("request-latency-avg", METRIC_GROUP_NAME, "The average request latency in ms", tags);
         this.requestLatencyMax = new MetricNameTemplate("request-latency-max", METRIC_GROUP_NAME, "The maximum request latency in ms", tags);
         this.recordSendRate = new MetricNameTemplate("record-send-rate", METRIC_GROUP_NAME, "The average number of records sent per second.", tags);
+        this.recordSendTotal = new MetricNameTemplate("record-send-total", METRIC_GROUP_NAME, "The total number of records sent.", tags);
         this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", METRIC_GROUP_NAME, "The average number of records per request.", tags);
         this.recordRetryRate = new MetricNameTemplate("record-retry-rate", METRIC_GROUP_NAME, "The average per-second number of retried record sends", tags);
+        this.recordRetryTotal = new MetricNameTemplate("record-retry-total", METRIC_GROUP_NAME, "The total number of retried record sends", tags);
         this.recordErrorRate = new MetricNameTemplate("record-error-rate", METRIC_GROUP_NAME, "The average per-second number of record sends that resulted in errors", tags);
+        this.recordErrorTotal = new MetricNameTemplate("record-error-total", METRIC_GROUP_NAME, "The total number of record sends that resulted in errors", tags);
         this.recordSizeMax = new MetricNameTemplate("record-size-max", METRIC_GROUP_NAME, "The maximum record size", tags);
         this.recordSizeAvg = new MetricNameTemplate("record-size-avg", METRIC_GROUP_NAME, "The average record size", tags);
         this.requestsInFlight = new MetricNameTemplate("requests-in-flight", METRIC_GROUP_NAME, "The current number of in-flight requests awaiting a response.", tags);
         this.metadataAge = new MetricNameTemplate("metadata-age", METRIC_GROUP_NAME, "The age in seconds of the current producer metadata being used.", tags);
         this.batchSplitRate = new MetricNameTemplate("batch-split-rate", METRIC_GROUP_NAME, "The average number of batch splits per second", tags);
+        this.batchSplitTotal = new MetricNameTemplate("batch-split-total", METRIC_GROUP_NAME, "The total number of batch splits", tags);
 
         this.produceThrottleTimeAvg = new MetricNameTemplate("produce-throttle-time-avg", METRIC_GROUP_NAME, "The average time in ms a request was throttled by a broker", tags);
         this.produceThrottleTimeMax = new MetricNameTemplate("produce-throttle-time-max", METRIC_GROUP_NAME, "The maximum time in ms a request was throttled by a broker", tags);
@@ -85,10 +97,14 @@ public class SenderMetricsRegistry {
         topicTags.add("topic");
 
         this.topicRecordSendRate = new MetricNameTemplate("record-send-rate", TOPIC_METRIC_GROUP_NAME, "The average number of records sent per second for a topic.", topicTags);
+        this.topicRecordSendTotal = new MetricNameTemplate("record-send-total", TOPIC_METRIC_GROUP_NAME, "The total number of records sent for a topic.", topicTags);
         this.topicByteRate = new MetricNameTemplate("byte-rate", TOPIC_METRIC_GROUP_NAME, "The average number of bytes sent per second for a topic.", topicTags);
+        this.topicByteTotal = new MetricNameTemplate("byte-total", TOPIC_METRIC_GROUP_NAME, "The total number of bytes sent for a topic.", topicTags);
         this.topicCompressionRate = new MetricNameTemplate("compression-rate", TOPIC_METRIC_GROUP_NAME, "The average compression rate of record batches for a topic.", topicTags);
         this.topicRecordRetryRate = new MetricNameTemplate("record-retry-rate", TOPIC_METRIC_GROUP_NAME, "The average per-second number of retried record sends for a topic", topicTags);
+        this.topicRecordRetryTotal = new MetricNameTemplate("record-retry-total", TOPIC_METRIC_GROUP_NAME, "The total number of retried record sends for a topic", topicTags);
         this.topicRecordErrorRate = new MetricNameTemplate("record-error-rate", TOPIC_METRIC_GROUP_NAME, "The average per-second number of record sends that resulted in errors for a topic", topicTags);
+        this.topicRecordErrorTotal = new MetricNameTemplate("record-error-total", TOPIC_METRIC_GROUP_NAME, "The total number of record sends that resulted in errors for a topic", topicTags);
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ae331e7..55d75b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -224,8 +224,9 @@ public final class Sensor {
      */
     public synchronized void add(CompoundStat stat, MetricConfig config) {
         this.stats.add(Utils.notNull(stat));
+        Object lock = new Object();
         for (NamedMeasurable m : stat.stats()) {
-            KafkaMetric metric = new KafkaMetric(this, m.name(), m.stat(), config == null ? this.config : config, time);
+            KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
             this.registry.registerMetric(metric);
             this.metrics.add(metric);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
new file mode 100644
index 0000000..09263ce
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.CompoundStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.stats.Rate.SampledTotal;
+
+
+/**
+ * A compound stat that includes a rate metric and a cumulative total metric.
+ */
+public class Meter implements CompoundStat {
+
+    private final MetricName rateMetricName;
+    private final MetricName totalMetricName;
+    private final Rate rate;
+    private final Total total;
+
+    /**
+     * Construct a Meter with seconds as time unit and {@link SampledTotal} stats for Rate
+     */
+    public Meter(MetricName rateMetricName, MetricName totalMetricName) {
+        this(TimeUnit.SECONDS, new SampledTotal(), rateMetricName, totalMetricName);
+    }
+
+    /**
+     * Construct a Meter with provided time unit and {@link SampledTotal} stats for Rate
+     */
+    public Meter(TimeUnit unit, MetricName rateMetricName, MetricName totalMetricName) {
+        this(unit, new SampledTotal(), rateMetricName, totalMetricName);
+    }
+
+    /**
+     * Construct a Meter with seconds as time unit and provided {@link SampledStat} stats for Rate
+     */
+    public Meter(SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) {
+        this(TimeUnit.SECONDS, rateStat, rateMetricName, totalMetricName);
+    }
+
+    /**
+     * Construct a Meter with provided time unit and provided {@link SampledStat} stats for Rate
+     */
+    public Meter(TimeUnit unit, SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) {
+        this.total = new Total();
+        this.rate = new Rate(unit, rateStat);
+        this.rateMetricName = rateMetricName;
+        this.totalMetricName = totalMetricName;
+    }
+
+    @Override
+    public List<NamedMeasurable> stats() {
+        return Arrays.asList(
+            new NamedMeasurable(totalMetricName, total),
+            new NamedMeasurable(rateMetricName, rate));
+    }
+
+    @Override
+    public void record(MetricConfig config, double value, long timeMs) {
+        rate.record(config, value, timeMs);
+        total.record(config, value, timeMs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 23dbc0f..7977879 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -45,9 +45,10 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.SampledStat;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
@@ -860,46 +861,44 @@ public class Selector implements Selectable, AutoCloseable {
             }
 
             this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
-            MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
-            this.connectionClosed.add(metricName, new Rate());
+            this.connectionClosed.add(createMeter(metrics, metricGrpName, metricTags,
+                    "connection-close", "connections closed"));
 
             this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
-            metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
-            this.connectionCreated.add(metricName, new Rate());
+            this.connectionCreated.add(createMeter(metrics, metricGrpName, metricTags,
+                    "connection-creation", "new connections established"));
 
             this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
-            metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
-            bytesTransferred.add(metricName, new Rate(new Count()));
+            bytesTransferred.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
+                    "network-io", "network operations (reads or writes) on all connections"));
 
             this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
-            this.bytesSent.add(metricName, new Rate());
-            metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
-            this.bytesSent.add(metricName, new Rate(new Count()));
-            metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
+            this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags,
+                    "outgoing-byte", "outgoing bytes sent to all servers"));
+            this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
+                    "request", "requests sent"));
+            MetricName metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of requests sent.", metricTags);
             this.bytesSent.add(metricName, new Avg());
-            metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
+            metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent.", metricTags);
             this.bytesSent.add(metricName, new Max());
 
             this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
-            metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
-            this.bytesReceived.add(metricName, new Rate());
-            metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
-            this.bytesReceived.add(metricName, new Rate(new Count()));
+            this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
+                    "incoming-byte", "bytes read off all sockets"));
+            this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
+                    new Count(), "response", "responses received"));
 
             this.selectTime = sensor("select-time:" + tagsSuffix.toString());
-            metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
-            this.selectTime.add(metricName, new Rate(new Count()));
+            this.selectTime.add(createMeter(metrics, metricGrpName, metricTags,
+                    new Count(), "select", "times the I/O layer checked for new I/O to perform"));
             metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
             this.selectTime.add(metricName, new Avg());
-            metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
-            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
+            this.selectTime.add(createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io-wait", "waiting"));
 
             this.ioTime = sensor("io-time:" + tagsSuffix.toString());
             metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
             this.ioTime.add(metricName, new Avg());
-            metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
-            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
+            this.ioTime.add(createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io", "doing I/O"));
 
             metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
             topLevelMetricNames.add(metricName);
@@ -910,6 +909,32 @@ public class Selector implements Selectable, AutoCloseable {
             });
         }
 
+        private Meter createMeter(Metrics metrics, String groupName, Map<String, String> metricTags,
+                SampledStat stat, String baseName, String descriptiveName) {
+            MetricName rateMetricName = metrics.metricName(baseName + "-rate", groupName,
+                            String.format("The number of %s per second", descriptiveName), metricTags);
+            MetricName totalMetricName = metrics.metricName(baseName + "-total", groupName,
+                            String.format("The total number of %s", descriptiveName), metricTags);
+            if (stat == null)
+                return new Meter(rateMetricName, totalMetricName);
+            else
+                return new Meter(stat, rateMetricName, totalMetricName);
+        }
+
+        private Meter createMeter(Metrics metrics, String groupName,  Map<String, String> metricTags,
+                String baseName, String descriptiveName) {
+            return createMeter(metrics, groupName, metricTags, null, baseName, descriptiveName);
+        }
+
+        private Meter createIOThreadRatioMeter(Metrics metrics, String groupName,  Map<String, String> metricTags,
+                String baseName, String action) {
+            MetricName rateMetricName = metrics.metricName(baseName + "-ratio", groupName,
+                    String.format("The fraction of time the I/O thread spent %s", action), metricTags);
+            MetricName totalMetricName = metrics.metricName(baseName + "time-total", groupName,
+                    String.format("The total time the I/O thread spent %s", action), metricTags);
+            return new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName);
+        }
+
         private Sensor sensor(String name, Sensor... parents) {
             Sensor sensor = metrics.sensor(name, parents);
             sensors.add(sensor);
@@ -929,21 +954,17 @@ public class Selector implements Selectable, AutoCloseable {
                     tags.put("node-id", "node-" + connectionId);
 
                     nodeRequest = sensor(nodeRequestName);
-                    MetricName metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, tags);
-                    nodeRequest.add(metricName, new Rate());
-                    metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
-                    nodeRequest.add(metricName, new Rate(new Count()));
-                    metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
+                    nodeRequest.add(createMeter(metrics, metricGrpName, tags, "outgoing-byte", "outgoing bytes"));
+                    nodeRequest.add(createMeter(metrics, metricGrpName, tags, new Count(), "request", "requests sent"));
+                    MetricName metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of requests sent.", tags);
                     nodeRequest.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
+                    metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent.", tags);
                     nodeRequest.add(metricName, new Max());
 
                     String nodeResponseName = "node-" + connectionId + ".bytes-received";
                     Sensor nodeResponse = sensor(nodeResponseName);
-                    metricName = metrics.metricName("incoming-byte-rate", metricGrpName, tags);
-                    nodeResponse.add(metricName, new Rate());
-                    metricName = metrics.metricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
-                    nodeResponse.add(metricName, new Rate(new Count()));
+                    nodeResponse.add(createMeter(metrics, metricGrpName, tags, "incoming-byte", "incoming bytes"));
+                    nodeResponse.add(createMeter(metrics, metricGrpName, tags, new Count(), "response", "responses received"));
 
                     String nodeTimeName = "node-" + connectionId + ".latency";
                     Sensor nodeRequestTime = sensor(nodeTimeName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 0a30490..8bcc775 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
@@ -248,13 +248,15 @@ public class BufferPoolTest {
         Metrics mockedMetrics = createNiceMock(Metrics.class);
         Sensor mockedSensor = createNiceMock(Sensor.class);
         MetricName metricName = createNiceMock(MetricName.class);
+        MetricName rateMetricName = createNiceMock(MetricName.class);
+        MetricName totalMetricName = createNiceMock(MetricName.class);
 
         expect(mockedMetrics.sensor(BufferPool.WAIT_TIME_SENSOR_NAME)).andReturn(mockedSensor);
 
         mockedSensor.record(anyDouble(), anyLong());
         expectLastCall().andThrow(new OutOfMemoryError());
         expect(mockedMetrics.metricName(anyString(), eq(metricGroup), anyString())).andReturn(metricName);
-        mockedSensor.add(metricName, new Rate(TimeUnit.NANOSECONDS));
+        mockedSensor.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
 
         replay(mockedMetrics, mockedSensor, metricName);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 0904a41..e24c3d7 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Percentile;
 import org.apache.kafka.common.metrics.stats.Percentiles;
@@ -88,8 +89,10 @@ public class MetricsTest {
         s.add(metrics.metricName("test.avg", "grp1"), new Avg());
         s.add(metrics.metricName("test.max", "grp1"), new Max());
         s.add(metrics.metricName("test.min", "grp1"), new Min());
-        s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
-        s.add(metrics.metricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
+        s.add(new Meter(TimeUnit.SECONDS, metrics.metricName("test.rate", "grp1"),
+                metrics.metricName("test.total", "grp1")));
+        s.add(new Meter(TimeUnit.SECONDS, new Count(), metrics.metricName("test.occurences", "grp1"),
+                metrics.metricName("test.occurences.total", "grp1")));
         s.add(metrics.metricName("test.count", "grp1"), new Count());
         s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
                              new Percentile(metrics.metricName("test.median", "grp1"), 50.0),
@@ -439,7 +442,10 @@ public class MetricsTest {
         // Use the default time window. Set 3 samples
         MetricConfig cfg = new MetricConfig().samples(3);
         Sensor s = metrics.sensor("test.sensor", cfg);
-        s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
+        MetricName rateMetricName = metrics.metricName("test.rate", "grp1");
+        MetricName totalMetricName = metrics.metricName("test.total", "grp1");
+        s.add(new Meter(TimeUnit.SECONDS, rateMetricName, totalMetricName));
+        KafkaMetric totalMetric = metrics.metrics().get(metrics.metricName("test.total", "grp1"));
 
         int sum = 0;
         int count = cfg.samples() - 1;
@@ -448,6 +454,7 @@ public class MetricsTest {
             s.record(100);
             sum += 100;
             time.sleep(cfg.timeWindowMs());
+            assertEquals(sum, totalMetric.value(), EPS);
         }
 
         // Sleep for half the window.
@@ -456,10 +463,11 @@ public class MetricsTest {
         // prior to any time passing
         double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
 
-        KafkaMetric km = metrics.metrics().get(metrics.metricName("test.rate", "grp1"));
-        assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS);
+        KafkaMetric rateMetric = metrics.metrics().get(metrics.metricName("test.rate", "grp1"));
+        assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, rateMetric.value(), EPS);
         assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
-                ((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);
+                ((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);
+        assertEquals(sum, totalMetric.value(), EPS);
     }
 
     public static class ConstantMeasurable implements Measurable {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
new file mode 100644
index 0000000..8204771
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.junit.Test;
+
+public class MeterTest {
+
+    private static final double EPS = 0.0000001d;
+
+    @Test
+    public void testMeter() {
+        Map<String, String> emptyTags = Collections.emptyMap();
+        MetricName rateMetricName = new MetricName("rate", "test", "", emptyTags);
+        MetricName totalMetricName = new MetricName("total", "test", "", emptyTags);
+        Meter meter = new Meter(rateMetricName, totalMetricName);
+        List<NamedMeasurable> stats = meter.stats();
+        assertEquals(2, stats.size());
+        NamedMeasurable total = stats.get(0);
+        NamedMeasurable rate = stats.get(1);
+        assertEquals(rateMetricName, rate.name());
+        assertEquals(totalMetricName, total.name());
+        Rate rateStat = (Rate) rate.stat();
+        Total totalStat = (Total) total.stat();
+
+        MetricConfig config = new MetricConfig();
+        double nextValue = 0.0;
+        double expectedTotal = 0.0;
+        long now = 0;
+        double intervalMs = 100;
+        double delta = 5.0;
+
+        // Record values in multiple windows and verify that rates are reported
+        // for time windows and that the total is cumulative.
+        for (int i = 1; i <= 100; i++) {
+            for (; now < i * 1000; now += intervalMs, nextValue += delta) {
+                expectedTotal += nextValue;
+                meter.record(config, nextValue, now);
+            }
+            assertEquals(expectedTotal, totalStat.measure(config, now), EPS);
+            long windowSizeMs = rateStat.windowSize(config, now);
+            long windowStartMs = Math.max(now - windowSizeMs, 0);
+            double sampledTotal = 0.0;
+            double prevValue = nextValue - delta;
+            for (long timeMs = now - 100; timeMs >= windowStartMs; timeMs -= intervalMs, prevValue -= delta)
+                sampledTotal += prevValue;
+            assertEquals(sampledTotal * 1000 / windowSizeMs, rateStat.measure(config, now), EPS);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index de2bcde..875652d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -33,7 +33,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.metrics.stats.Rate
+import org.apache.kafka.common.metrics.stats.Meter
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
@@ -64,7 +64,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
   private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")
-  memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS))
+  private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")
+  memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
   val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
   private val processors = new Array[Processor](totalProcessorThreads)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index aad9b6a..604bbf3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -21,7 +21,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.{MetricName, TopicPartition}
+import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
 import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
@@ -1508,6 +1508,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     servers.foreach(assertNoExemptRequestMetric(_))
   }
 
+  // Rate metrics of both Producer and Consumer are verified by this test
+  @Test
+  def testRateMetricsHaveCumulativeCount() {
+    val numRecords = 100
+    sendRecords(numRecords)
+
+    val consumer = this.consumers.head
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset = 0)
+
+    def exists(name: String, rateMetricName: MetricName, allMetricNames: Set[MetricName]): Boolean = {
+      allMetricNames.contains(new MetricName(name, rateMetricName.group, "", rateMetricName.tags))
+    }
+
+    def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): Unit = {
+      val name = rateMetricName.name
+      val totalExists = exists(name.replace("-rate", "-total"), rateMetricName, allMetricNames)
+      val totalTimeExists = exists(name.replace("-rate", "-time"), rateMetricName, allMetricNames)
+      assertTrue(s"No cumulative count/time metric for rate metric $rateMetricName",
+          totalExists || totalTimeExists)
+    }
+
+    val consumerMetricNames = consumer.metrics.keySet.asScala.toSet
+    consumerMetricNames.filter(_.name.endsWith("-rate"))
+        .foreach(verify(_, consumerMetricNames))
+
+    val producer = this.producers.head
+    val producerMetricNames = producer.metrics.keySet.asScala.toSet
+    val producerExclusions = Set("compression-rate") // compression-rate is an Average metric, not Rate
+    producerMetricNames.filter(_.name.endsWith("-rate"))
+        .filterNot(metricName => producerExclusions.contains(metricName.name))
+        .foreach(verify(_, producerMetricNames))
+
+    def verifyMetric(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String): Unit = {
+      val entry = metrics.asScala.find { case (metricName, _) => metricName.name == name }
+      assertTrue(s"$entity metric not defined $name", entry.nonEmpty)
+      entry.foreach { case (metricName, metric) =>
+        assertTrue(s"$entity metric not recorded $metricName", metric.value > 0.0)
+      }
+    }
+
+    // Check a couple of metrics of consumer and producer to ensure that values are set
+    verifyMetric("records-consumed-rate", consumer.metrics, "Consumer")
+    verifyMetric("records-consumed-total", consumer.metrics, "Consumer")
+    verifyMetric("record-send-rate", producer.metrics, "Producer")
+    verifyMetric("record-send-total", producer.metrics, "Producer")
+  }
+
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eea41aa..73f443e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Metrics;
@@ -32,7 +33,8 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.SampledStat;
 import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
@@ -492,34 +494,42 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
             commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
             commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
-            commitTimeSensor.add(metrics.metricName("commit-rate", this.groupName, "The average per-second number of commit calls", this.tags), new Rate(new Count()));
+            commitTimeSensor.add(createMeter(metrics, new Count(), "commit", "commit calls"));
 
             pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
             pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
             pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
-            pollTimeSensor.add(metrics.metricName("poll-rate", this.groupName, "The average per-second number of record-poll calls", this.tags), new Rate(new Count()));
+            pollTimeSensor.add(createMeter(metrics, new Count(), "poll", "record-poll calls"));
 
             processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
             processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
             processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
-            processTimeSensor.add(metrics.metricName("process-rate", this.groupName, "The average per-second number of process calls", this.tags), new Rate(new Count()));
+            processTimeSensor.add(createMeter(metrics, new Count(), "process", "process calls"));
 
             punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
             punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
             punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
-            punctuateTimeSensor.add(metrics.metricName("punctuate-rate", this.groupName, "The average per-second number of punctuate calls", this.tags), new Rate(new Count()));
+            punctuateTimeSensor.add(createMeter(metrics, new Count(), "punctuate", "punctuate calls"));
 
             taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
-            taskCreatedSensor.add(metrics.metricName("task-created-rate", this.groupName, "The average per-second number of newly created tasks", this.tags), new Rate(new Count()));
+            taskCreatedSensor.add(createMeter(metrics, new Count(), "task-created", "newly created tasks"));
 
             tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
-            tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName, "The average per-second number of closed tasks", this.tags), new Rate(new Count()));
+            tasksClosedSensor.add(createMeter(metrics, new Count(), "task-closed", "closed tasks"));
 
             skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
-            skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records.", this.tags), new Rate(new Sum()));
+            skippedRecordsSensor.add(createMeter(metrics, new Sum(), "skipped-records", "skipped records"));
 
         }
 
+        private Meter createMeter(Metrics metrics, SampledStat stat, String baseName, String descriptiveName) {
+            MetricName rateMetricName = metrics.metricName(baseName + "-rate", groupName,
+                    String.format("The average per-second number of %s", descriptiveName), tags);
+            MetricName totalMetricName = metrics.metricName(baseName + "-total", groupName,
+                    String.format("The total number of %s", descriptiveName), tags);
+            return new Meter(stat, rateMetricName, totalMetricName);
+        }
+
         void removeAllSensors() {
             removeSensor(commitTimeSensor);
             removeSensor(pollTimeSensor);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index 70d421e..b254bb8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.slf4j.Logger;
@@ -171,8 +171,15 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
     private void addThroughputMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
-        maybeAddMetric(sensor, metrics.metricName(opName + "-rate", groupNameFromScope(scopeName),
-            "The average number of occurrence of " + opName + " operation per second.", tags), new Rate(new Count()));
+        MetricName rateMetricName = metrics.metricName(opName + "-rate", groupNameFromScope(scopeName),
+            "The average number of occurrence of " + opName + " operation per second.", tags);
+        MetricName totalMetricName = metrics.metricName(opName + "-total", groupNameFromScope(scopeName),
+                "The total number of occurrence of " + opName + " operations.", tags);
+        if (!metrics.metrics().containsKey(rateMetricName) && !metrics.metrics().containsKey(totalMetricName)) {
+            sensor.add(new Meter(new Count(), rateMetricName, totalMetricName));
+        } else {
+            log.trace("Trying to add metric twice: {} {}", rateMetricName, totalMetricName);
+        }
     }
 
     public void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 994a3a4..33d55e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -302,8 +302,8 @@ public class KafkaStreamsTest {
     public void testNumberDefaultMetrics() {
         final KafkaStreams streams = createKafkaStreams();
         final Map<MetricName, ? extends Metric> metrics = streams.metrics();
-        // all 15 default StreamThread metrics + 1 metric that keeps track of number of metrics
-        assertEquals(metrics.size(), 16);
+        // all 22 default StreamThread metrics + 1 metric that keeps track of number of metrics
+        assertEquals(23, metrics.size());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/8a5e8666/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index 215d4f1..7b16246 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -80,8 +80,10 @@ public class StreamsMetricsImplTest {
         Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
         Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
-        // 6 metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
-        assertEquals(metrics.size(), 7);
+        // 2 meters and 4 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
+        int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
+        int otherMetricsCount = 4;
+        assertEquals(meterMetricsCount * 2 + otherMetricsCount + 1, metrics.size());
 
         streamsMetrics.removeSensor(sensor1);
         metrics = streamsMetrics.metrics();
@@ -100,8 +102,9 @@ public class StreamsMetricsImplTest {
         Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
         Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
-        // 2 metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
-        assertEquals(metrics.size(), 3);
+        int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
+        // 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
+        assertEquals(meterMetricsCount * 2 + 1, metrics.size());
 
         streamsMetrics.removeSensor(sensor1);
         metrics = streamsMetrics.metrics();


Mime
View raw message