kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6590; Fix bug in aggregation of consumer fetch bytes and counts metrics (#4278)
Date Sun, 25 Feb 2018 02:46:16 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 301357a  KAFKA-6590; Fix bug in aggregation of consumer fetch bytes and counts metrics
(#4278)
301357a is described below

commit 301357a2e87e9330588b8e462106f56c43e3df8d
Author: Igor Kostiakov <igorkos@kth.se>
AuthorDate: Sun Feb 25 03:43:10 2018 +0100

    KAFKA-6590; Fix bug in aggregation of consumer fetch bytes and counts metrics (#4278)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  4 +-
 .../clients/consumer/internals/FetcherTest.java    | 54 ++++++++++++++++------
 2 files changed, 41 insertions(+), 17 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7eb94d3..e81195e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1265,8 +1265,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
 
             if (this.unrecordedPartitions.isEmpty()) {
                 // once all expected partitions from the fetch have reported in, record the
metrics
-                this.sensors.bytesFetched.record(topicFetchMetric.fetchBytes);
-                this.sensors.recordsFetched.record(topicFetchMetric.fetchRecords);
+                this.sensors.bytesFetched.record(this.fetchMetrics.fetchBytes);
+                this.sensors.recordsFetched.record(this.fetchMetrics.fetchRecords);
 
                 // also record per-topic metrics
                 for (Map.Entry<String, FetchMetrics> entry: this.topicFetchMetrics.entrySet())
{
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index bf36d86..832b127 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1408,26 +1408,50 @@ public class FetcherTest {
 
     @Test
     public void testFetchResponseMetrics() {
-        subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.seek(tp0, 0);
+        String topic1 = "foo";
+        String topic2 = "bar";
+        TopicPartition tp1 = new TopicPartition(topic1, 0);
+        TopicPartition tp2 = new TopicPartition(topic2, 0);
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(topic1, 1);
+        partitionCounts.put(topic2, 1);
+        Cluster cluster = TestUtils.clusterWith(1, partitionCounts);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
-        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
-        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
-
-        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
-                TimestampType.CREATE_TIME, 0L);
-        for (int v = 0; v < 3; v++)
-            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-"
+ v).getBytes());
-        MemoryRecords records = builder.build();
+        subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
 
         int expectedBytes = 0;
-        for (Record record : records.records())
-            expectedBytes += record.sizeInBytes();
+        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> fetchPartitionData
= new LinkedHashMap<>();
 
-        fetchRecords(tp0, records, Errors.NONE, 100L, 0);
+        for (TopicPartition tp : Utils.mkSet(tp1, tp2)) {
+            subscriptions.seek(tp, 0);
+
+            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024),
CompressionType.NONE,
+                    TimestampType.CREATE_TIME, 0L);
+            for (int v = 0; v < 3; v++)
+                builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-"
+ v).getBytes());
+            MemoryRecords records = builder.build();
+            for (Record record : records.records())
+                expectedBytes += record.sizeInBytes();
+
+            fetchPartitionData.put(tp, new FetchResponse.PartitionData(Errors.NONE, 15L,
+                    FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+        }
+
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(new FetchResponse(Errors.NONE, fetchPartitionData, 0, INVALID_SESSION_ID));
+        consumerClient.poll(0);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords
= fetcher.fetchedRecords();
+        assertEquals(3, fetchedRecords.get(tp1).size());
+        assertEquals(3, fetchedRecords.get(tp2).size());
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
         assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
-        assertEquals(3, recordsCountAverage.value(), EPSILON);
+        assertEquals(6, recordsCountAverage.value(), EPSILON);
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message