kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4234; Revert automatic offset commit behavior in consumer's `unsubscribe()`
Date Fri, 30 Sep 2016 20:28:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 ac6cce77e -> abf056c08


KAFKA-4234; Revert automatic offset commit behavior in consumer's `unsubscribe()`

Temporarily disable the offset commit (when auto commit is enabled) in the new consumer's
`unsubscribe()` method towards a workaround for the issue reported in [KAFKA-3491](https://issues.apache.org/jira/browse/KAFKA-3491).
For now, a call to `unsubscribe()` can be made to reset the offsets in case processing the
batch received from the most recent `poll()` is interrupted (due to some exception).

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #1944 from vahidhashemian/KAFKA-4234

(cherry picked from commit 20322446aa261dec8b51e6e4514307e926a29ba5)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/abf056c0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/abf056c0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/abf056c0

Branch: refs/heads/0.10.1
Commit: abf056c08565d6966f4feccf722dac98be18dc31
Parents: ac6cce7
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Fri Sep 30 13:13:24 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Sep 30 13:28:55 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/KafkaConsumer.java    |  4 ----
 .../kafka/clients/consumer/KafkaConsumerTest.java       | 12 +-----------
 2 files changed, 1 insertion(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/abf056c0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d5b1a4b..830f071 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -874,10 +874,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public void unsubscribe() {
         acquire();
         try {
-            // make sure the offsets of topic partitions the consumer is unsubscribing from
-            // are committed since there will be no following rebalance
-            this.coordinator.maybeAutoCommitOffsetsNow();
-
             log.debug("Unsubscribed all topics or patterns and assigned partitions");
             this.subscriptions.unsubscribe();
             this.coordinator.maybeLeaveGroup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/abf056c0/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 0096e72..b1c6962 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -614,8 +614,7 @@ public class KafkaConsumerTest {
      * do not immediately change, and the latest consumed offsets of its to-be-revoked
      * partitions are properly committed (when auto-commit is enabled).
      * Upon unsubscribing from subscribed topics the consumer subscription and assignment
-     * are both updated right away and its consumed offsets are committed (if auto-commit
-     * is enabled).
+     * are both updated right away but its consumed offsets are not auto committed.
      */
     @Test
     public void testSubscriptionChangesWithAutoCommitEnabled() {
@@ -722,21 +721,12 @@ public class KafkaConsumerTest {
         assertTrue(consumer.assignment().size() == 2);
         assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t3p0));
 
-        // mock the offset commit response for to be revoked partitions
-        Map<TopicPartition, Long> partitionOffsets2 = new HashMap<>();
-        partitionOffsets2.put(tp0, 2L);
-        partitionOffsets2.put(t3p0, 100L);
-        commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets2);
-
         consumer.unsubscribe();
 
         // verify that subscription and assignment are both cleared
         assertTrue(consumer.subscription().isEmpty());
         assertTrue(consumer.assignment().isEmpty());
 
-        // verify that the offset commits occurred as expected
-        assertTrue(commitReceived.get());
-
         consumer.close();
     }
 


Mime
View raw message