kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4172; Ensure fetch responses contain the requested partitions
Date Thu, 15 Sep 2016 01:05:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c1bce2d75 -> 084a19e9a


KAFKA-4172; Ensure fetch responses contain the requested partitions

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1857 from hachikuji/KAFKA-4172


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/084a19e9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/084a19e9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/084a19e9

Branch: refs/heads/trunk
Commit: 084a19e9acb43666cfcaa2ca155a775d47cd8b39
Parents: c1bce2d
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Sep 14 18:04:58 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed Sep 14 18:04:58 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 15 +++++
 .../clients/consumer/KafkaConsumerTest.java     | 62 ++++++++++++++++----
 2 files changed, 65 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/084a19e9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 23a8511..dd9d084 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -141,6 +141,12 @@ public class Fetcher<K, V> {
         return numInFlightFetches.get() > 0;
     }
 
+    private boolean matchesRequestedPartitions(FetchRequest request, FetchResponse response)
{
+        Set<TopicPartition> requestedPartitions = request.fetchData().keySet();
+        Set<TopicPartition> fetchedPartitions = response.responseData().keySet();
+        return fetchedPartitions.equals(requestedPartitions);
+    }
+
     /**
      * Set-up a fetch request for any node that we have assigned partitions for which doesn't
already have
      * an in-flight fetch or pending fetch data.
@@ -158,6 +164,15 @@ public class Fetcher<K, V> {
                             numInFlightFetches.decrementAndGet();
 
                             FetchResponse response = new FetchResponse(resp.responseBody());
+                            if (!matchesRequestedPartitions(request, response)) {
+                                // obviously we expect the broker to always send us valid
responses, so this check
+                                // is mainly for test cases where mock fetch responses must
be manually crafted.
+                                log.warn("Ignoring fetch response containing partitions {}
since it does not match " +
+                                        "the requested partitions {}", response.responseData().keySet(),
+                                        request.fetchData().keySet());
+                                return;
+                            }
+
                             Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                             FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors,
partitions);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/084a19e9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index dbe3d67..0791f13 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -73,6 +73,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -139,7 +141,7 @@ public class KafkaConsumerTest {
     public void testSubscription() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
 
-        consumer.subscribe(Collections.singletonList(topic));
+        consumer.subscribe(singletonList(topic));
         assertEquals(singleton(topic), consumer.subscription());
         assertTrue(consumer.assignment().isEmpty());
 
@@ -147,7 +149,7 @@ public class KafkaConsumerTest {
         assertTrue(consumer.subscription().isEmpty());
         assertTrue(consumer.assignment().isEmpty());
 
-        consumer.assign(Collections.singletonList(tp0));
+        consumer.assign(singletonList(tp0));
         assertTrue(consumer.subscription().isEmpty());
         assertEquals(singleton(tp0), consumer.assignment());
 
@@ -175,7 +177,7 @@ public class KafkaConsumerTest {
         String nullTopic = null;
 
         try {
-            consumer.subscribe(Collections.singletonList(nullTopic));
+            consumer.subscribe(singletonList(nullTopic));
         } finally {
             consumer.close();
         }
@@ -187,7 +189,7 @@ public class KafkaConsumerTest {
         String emptyTopic = "  ";
 
         try {
-            consumer.subscribe(Collections.singletonList(emptyTopic));
+            consumer.subscribe(singletonList(emptyTopic));
         } finally {
             consumer.close();
         }
@@ -299,7 +301,7 @@ public class KafkaConsumerTest {
     public void testPause() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
 
-        consumer.assign(Collections.singletonList(tp0));
+        consumer.assign(singletonList(tp0));
         assertEquals(singleton(tp0), consumer.assignment());
         assertTrue(consumer.paused().isEmpty());
 
@@ -569,6 +571,41 @@ public class KafkaConsumerTest {
         assertEquals(5, records.count());
     }
 
+    @Test
+    public void fetchResponseWithUnexpectedPartitionIsIgnored() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+
+        // adjust auto commit interval lower than heartbeat so we don't need to deal with
+        // a concurrent heartbeat request
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata,
assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+
+        consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
+
+        prepareRebalance(client, node, assignor, singletonList(tp0), null);
+
+        Map<TopicPartition, FetchInfo> fetches1 = new HashMap<>();
+        fetches1.put(tp0, new FetchInfo(0, 1));
+        fetches1.put(t2p0, new FetchInfo(0, 10)); // not assigned and not fetched
+        client.prepareResponseFrom(fetchResponse(fetches1), node);
+
+        ConsumerRecords<String, String> records = consumer.poll(0);
+        assertEquals(0, records.count());
+    }
+
     /**
      * Verify that when a consumer changes its topic subscription its assigned partitions
      * do not immediately change, and the latest consumed offsets of its to-be-revoked
@@ -631,12 +668,17 @@ public class KafkaConsumerTest {
 
         ConsumerRecords<String, String> records = consumer.poll(0);
 
+        // clear out the prefetch so it doesn't interfere with the rest of the test
+        fetches1.put(tp0, new FetchInfo(1, 0));
+        fetches1.put(t2p0, new FetchInfo(10, 0));
+        client.respondFrom(fetchResponse(fetches1), node);
+        client.poll(0, time.milliseconds());
+
         // verify that the fetch occurred as expected
         assertEquals(11, records.count());
         assertEquals(1L, consumer.position(tp0));
         assertEquals(10L, consumer.position(t2p0));
 
-
         // subscription change
         consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer));
 
@@ -655,12 +697,10 @@ public class KafkaConsumerTest {
         // mock rebalance responses
         prepareRebalance(client, node, assignor, Arrays.asList(tp0, t3p0), coordinator);
 
-        // mock a response to the outstanding fetch so that we have data available on the
next poll
+        // mock a response to the next fetch from the new assignment
         Map<TopicPartition, FetchInfo> fetches2 = new HashMap<>();
         fetches2.put(tp0, new FetchInfo(1, 1));
         fetches2.put(t3p0, new FetchInfo(0, 100));
-        client.respondFrom(fetchResponse(fetches2), node);
-        client.poll(0, time.milliseconds());
         client.prepareResponse(fetchResponse(fetches2));
 
         records = consumer.poll(0);
@@ -679,14 +719,12 @@ public class KafkaConsumerTest {
         assertTrue(consumer.assignment().size() == 2);
         assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t3p0));
 
-
         // mock the offset commit response for to be revoked partitions
         Map<TopicPartition, Long> partitionOffsets2 = new HashMap<>();
         partitionOffsets2.put(tp0, 2L);
         partitionOffsets2.put(t3p0, 100L);
         commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets2);
 
-        // unsubscribe
         consumer.unsubscribe();
 
         // verify that subscription and assignment are both cleared
@@ -993,7 +1031,7 @@ public class KafkaConsumerTest {
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet())
{
             partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
-                    Collections.singletonList(partitionOffset.getValue())));
+                    singletonList(partitionOffset.getValue())));
         }
         return new ListOffsetResponse(partitionData).toStruct();
     }


Mime
View raw message