kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4429; Consumer lag metric should be zero if FetchResponse is empty
Date Tue, 03 Jan 2017 22:11:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a565a77b1 -> b565dd7eb


KAFKA-4429; Consumer lag metric should be zero if FetchResponse is empty

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Jason Gustafson <jason@confluent.io>

Closes #2155 from lindong28/KAFKA-4429


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b565dd7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b565dd7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b565dd7e

Branch: refs/heads/trunk
Commit: b565dd7eb184da5ef3b08c88a8acc3df221aaa08
Parents: a565a77
Author: Dong Lin <lindong28@gmail.com>
Authored: Tue Jan 3 13:54:40 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue Jan 3 13:54:40 2017 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  3 ++
 .../clients/consumer/internals/FetcherTest.java | 49 +++++++++++++++-----
 2 files changed, 40 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b565dd7e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 526b0a9..22588a8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -701,6 +701,9 @@ public class Fetcher<K, V> {
                     parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                     this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
+                } else if (partition.highWatermark >= 0) {
+                    log.trace("Received empty fetch response for partition {} with offset
{}", tp, position);
+                    this.sensors.recordsFetchLag.record(partition.highWatermark - fetchOffset);
                 }
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                 log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b565dd7e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 272a5ee..0095697 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -607,7 +607,6 @@ public class FetcherTest {
      */
     @Test
     public void testQuotaMetrics() throws Exception {
-        List<ConsumerRecord<byte[], byte[]>> records;
         subscriptions.assignFromUser(singleton(tp));
         subscriptions.seek(tp, 0);
 
@@ -615,17 +614,10 @@ public class FetcherTest {
         for (int i = 1; i < 4; i++) {
             // We need to make sure the message offset grows. Otherwise they will be considered
as already consumed
             // and filtered out by consumer.
-            if (i > 1) {
-                MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024),
CompressionType.NONE, TimestampType.CREATE_TIME);
-                for (int v = 0; v < 3; v++) {
-                    builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(),
String.format("value-%d", v).getBytes());
-                }
-                this.records = builder.build();
-            }
-            assertEquals(1, fetcher.sendFetches());
-            client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L,
100 * i));
-            consumerClient.poll(0);
-            records = fetcher.fetchedRecords().get(tp);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024),
CompressionType.NONE, TimestampType.CREATE_TIME);
+            for (int v = 0; v < 3; v++)
+                builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(),
String.format("value-%d", v).getBytes());
+            List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(),
Errors.NONE.code(), 100L, 100 * i).get(tp);
             assertEquals(3, records.size());
         }
 
@@ -636,6 +628,39 @@ public class FetcherTest {
         assertEquals(300, maxMetric.value(), EPSILON);
     }
 
+    /*
+     * Send multiple requests. Verify that the client side quota metrics have the right values
+     */
+    @Test
+    public void testFetcherMetrics() {
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric recordsFetchLagMax = allMetrics.get(metrics.metricName("records-lag-max",
metricGroup, ""));
+
+        // recordsFetchLagMax should be initialized to negative infinity
+        assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
+
+        // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse
+        fetchRecords(MemoryRecords.EMPTY, Errors.NONE.code(), 100L, 0);
+        assertEquals(100, recordsFetchLagMax.value(), EPSILON);
+
+        // recordsFetchLagMax should be hw - offset of the last message after receiving a
non-empty FetchResponse
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME);
+        for (int v = 0; v < 3; v++)
+            builder.appendWithOffset((long) v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d",
v).getBytes());
+        fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0);
+        assertEquals(198, recordsFetchLagMax.value(), EPSILON);
+    }
+
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords
records, short error, long hw, int throttleTime) {
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fetchResponse(records, error, hw, throttleTime));
+        consumerClient.poll(0);
+        return fetcher.fetchedRecords();
+    }
+
     @Test
     public void testGetOffsetsForTimesTimeout() {
         try {


Mime
View raw message