kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6877; Remove completedFetch upon a failed parse if it contains no records.
Date Wed, 09 May 2018 01:57:59 GMT
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7afcb3a  KAFKA-6877; Remove completedFetch upon a failed parse if it contains no
records.
7afcb3a is described below

commit 7afcb3a64c53831f05f99037dfa07aba1d396c48
Author: Adem Efe Gencer <agencer@linkedin.com>
AuthorDate: Tue May 8 18:57:34 2018 -0700

    KAFKA-6877; Remove completedFetch upon a failed parse if it contains no records.
    
    This patch removed a completedFetch from the completedFetches queue upon a failed parse
if it contains no records. The following scenario explains why this is needed for an instance
of this case – i.e. in TopicAuthorizationException.
    
    0. Let's assume a scenario, in which the consumer is attempting to read from a topic without
the necessary read permission.
    1. In Fetcher#fetchedRecords(), after peeking the completedFetches, the Fetcher#parseCompletedFetch(CompletedFetch)
throws a TopicAuthorizationException (as expected).
    2. Fetcher#fetchedRecords() passes the TopicAuthorizationException up without having a
chance to poll completedFetches. So, the same completedFetch remains at the completedFetches
queue.
    3. Upon following calls to Fetcher#fetchedRecords(), peeking the completedFetches will
always return the same completedFetch independent of any updates to the ACL that the topic
is trying to read from.
    4. Hence, despite the creation of an ACL with correct permissions, once the consumer sees
the TopicAuthorizationException, it will be unable to recover without a bounce.
    
    Author: Adem Efe Gencer <agencer@linkedin.com>
    
    Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
    
    Closes #4974 from efeg/fix/parseCompletedFetchRemainsInQueue
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 18 ++++-
 .../clients/consumer/internals/FetcherTest.java    | 80 +++++++++++++++++++++-
 2 files changed, 94 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6d8fb6c..b3791ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -472,6 +472,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
      * @return The fetched records per partition
      * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
and
      *         the defaultResetPolicy is NONE
+     * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse.
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords()
{
         Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
@@ -483,7 +484,20 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                     CompletedFetch completedFetch = completedFetches.peek();
                     if (completedFetch == null) break;
 
-                    nextInLineRecords = parseCompletedFetch(completedFetch);
+                    try {
+                        nextInLineRecords = parseCompletedFetch(completedFetch);
+                    } catch (Exception e) {
+                        // Remove a completedFetch upon a parse with exception if (1) it
contains no records, and
+                        // (2) there are no fetched records with actual content preceding
this exception.
+                        // The first condition ensures that the completedFetches is not stuck
with the same completedFetch
+                        // in cases such as the TopicAuthorizationException, and the second
condition ensures that no
+                        // potential data loss due to an exception in a following record.
+                        FetchResponse.PartitionData partition = completedFetch.partitionData;
+                        if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes()
== 0)) {
+                            completedFetches.poll();
+                        }
+                        throw e;
+                    }
                     completedFetches.poll();
                 } else {
                     List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords,
recordsRemaining);
@@ -945,7 +959,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                 this.metadata.requestUpdate();
             } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
                 if (fetchOffset != subscriptions.position(tp)) {
-                    log.debug("Discarding stale fetch response for partition {} since the
fetched offset {}" +
+                    log.debug("Discarding stale fetch response for partition {} since the
fetched offset {} " +
                             "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
                 } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                     log.info("Fetch offset {} is out of range for partition {}, resetting
offset", fetchOffset, tp);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6de1186..7157470 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -117,6 +118,8 @@ public class FetcherTest {
     private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
     private TopicPartition tp0 = new TopicPartition(topicName, 0);
     private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
     private int minBytes = 1;
     private int maxBytes = Integer.MAX_VALUE;
     private int maxWaitMs = 0;
@@ -126,7 +129,7 @@ public class FetcherTest {
     private MockTime time = new MockTime(1);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     private MockClient client = new MockClient(time, metadata);
-    private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
+    private Cluster cluster = TestUtils.singletonCluster(topicName, 4);
     private Node node = cluster.nodes().get(0);
     private Metrics metrics = new Metrics(time);
     FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
@@ -140,6 +143,7 @@ public class FetcherTest {
     private MemoryRecords records;
     private MemoryRecords nextRecords;
     private MemoryRecords emptyRecords;
+    private MemoryRecords partialRecords;
     private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, metrics);
     private Metrics fetcherMetrics = new Metrics(time);
     private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset,
fetcherMetrics);
@@ -162,6 +166,11 @@ public class FetcherTest {
 
         builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
         emptyRecords = builder.build();
+
+        builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 4L);
+        builder.append(0L, "key".getBytes(), "value-0".getBytes());
+        partialRecords = builder.build();
+        partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
     }
 
     @After
@@ -833,7 +842,7 @@ public class FetcherTest {
         partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
             FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET,
null, records));
         partitions.put(tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET,
null, MemoryRecords.EMPTY));
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET,
null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions),
             0, INVALID_SESSION_ID));
         consumerClient.poll(0);
@@ -864,6 +873,73 @@ public class FetcherTest {
     }
 
     @Test
+    public void testCompletedFetchRemoval() {
+        // Ensure the removal of completed fetches that cause an Exception if and only if
they contain empty records.
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0, tp1, tp2, tp3));
+        subscriptionsNoAutoReset.seek(tp0, 1);
+        subscriptionsNoAutoReset.seek(tp1, 1);
+        subscriptionsNoAutoReset.seek(tp2, 1);
+        subscriptionsNoAutoReset.seek(tp3, 1);
+
+        assertEquals(1, fetcherNoAutoReset.sendFetches());
+
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<>();
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+        partitions.put(tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET,
null, MemoryRecords.EMPTY));
+        partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100L, 4,
+            0L, null, nextRecords));
+        partitions.put(tp3, new FetchResponse.PartitionData(Errors.NONE, 100L, 4,
+            0L, null, partialRecords));
+        client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions),
+                                                 0, INVALID_SESSION_ID));
+        consumerClient.poll(0);
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>();
+        for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+            fetchedRecords.addAll(records);
+
+        assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp1) - 1);
+        assertEquals(4, subscriptionsNoAutoReset.position(tp1).longValue());
+        assertEquals(3, fetchedRecords.size());
+
+        List<OffsetOutOfRangeException> oorExceptions = new ArrayList<>();
+        try {
+            for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+                fetchedRecords.addAll(records);
+        } catch (OffsetOutOfRangeException oor) {
+            oorExceptions.add(oor);
+        }
+
+        // Should have received one OffsetOutOfRangeException for partition tp1
+        assertEquals(1, oorExceptions.size());
+        OffsetOutOfRangeException oor = oorExceptions.get(0);
+        assertTrue(oor.offsetOutOfRangePartitions().containsKey(tp0));
+        assertEquals(oor.offsetOutOfRangePartitions().size(), 1);
+
+        for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+            fetchedRecords.addAll(records);
+
+        // Should not have received an Exception for tp2.
+        assertEquals(6, subscriptionsNoAutoReset.position(tp2).longValue());
+        assertEquals(5, fetchedRecords.size());
+
+        int numExceptionsExpected = 3;
+        List<KafkaException> kafkaExceptions = new ArrayList<>();
+        for (int i = 1; i <= numExceptionsExpected; i++) {
+            try {
+                for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+                    fetchedRecords.addAll(records);
+            } catch (KafkaException e) {
+                kafkaExceptions.add(e);
+            }
+        }
+        // Should have received as much as numExceptionsExpected Kafka exceptions for tp3.
+        assertEquals(numExceptionsExpected, kafkaExceptions.size());
+    }
+
+    @Test
     public void testSeekBeforeException() {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptionsNoAutoReset, new
Metrics(time), 2);
 

-- 
To stop receiving notification emails like this one, please contact
jqin@apache.org.

Mime
View raw message