kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6362; Async auto-commit should discover coordinator if it is unknown (#4326)
Date Thu, 08 Feb 2018 17:59:45 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new f973183  KAFKA-6362; Async auto-commit should discover coordinator if it is unknown
(#4326)
f973183 is described below

commit f9731836a522c308f1f27699561b0f93888d0c02
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Fri Feb 9 01:49:12 2018 +0800

    KAFKA-6362; Async auto-commit should discover coordinator if it is unknown (#4326)
    
    Currently `maybeAutoCommitOffsetsAsync` does not try to find the coordinator if it is
unknown. As a result, asynchronous auto-commits will fail indefinitely. This patch changes
the behavior to add coordinator discovery to the async auto-commit path.
---
 .../apache/kafka/clients/consumer/KafkaConsumer.java |  2 +-
 .../consumer/internals/ConsumerCoordinator.java      | 20 +++++++-------------
 .../consumer/internals/ConsumerCoordinatorTest.java  | 19 +++++++++++++++++++
 3 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1d84f84..2f7fd58 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1058,7 +1058,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
                 // make sure the offsets of topic partitions the consumer is unsubscribing
from
                 // are committed since there will be no following rebalance
-                this.coordinator.maybeAutoCommitOffsetsNow();
+                this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
 
                 log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
                 this.subscriptions.assignFromUser(new HashSet<>(partitions));
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5c1e60e..d7c1ce9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -528,6 +528,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 public void onSuccess(Void value) {
                     pendingAsyncCommits.decrementAndGet();
                     doCommitOffsetsAsync(offsets, callback);
+                    client.pollNoWakeup();
                 }
 
                 @Override
@@ -623,20 +624,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         return false;
     }
 
-    private void maybeAutoCommitOffsetsAsync(long now) {
-        if (autoCommitEnabled) {
-            if (coordinatorUnknown()) {
-                this.nextAutoCommitDeadline = now + retryBackoffMs;
-            } else if (now >= nextAutoCommitDeadline) {
-                this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
-                doAutoCommitOffsetsAsync();
-            }
-        }
-    }
-
-    public void maybeAutoCommitOffsetsNow() {
-        if (autoCommitEnabled && !coordinatorUnknown())
+    public void maybeAutoCommitOffsetsAsync(long now) {
+        if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
             doAutoCommitOffsetsAsync();
+        }
     }
 
     private void doAutoCommitOffsetsAsync() {
@@ -650,8 +641,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets,
exception.getMessage());
                     if (exception instanceof RetriableException)
                         nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs,
nextAutoCommitDeadline);
+                    else
+                        nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
                 } else {
                     log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
+                    nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
                 }
             }
         });
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 76301a7..c49339b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1625,6 +1625,25 @@ public class ConsumerCoordinatorTest {
             assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
     }
 
+    @Test
+    public void testAutoCommitAfterCoordinatorBackToService() {
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        subscriptions.seek(t1p, 100L);
+
+        coordinator.coordinatorDead();
+        assertTrue(coordinator.coordinatorUnknown());
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+
+        // async commit offset should find coordinator
+        time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does
happen
+        coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+        assertFalse(coordinator.coordinatorUnknown());
+        assertEquals(subscriptions.committed(t1p).offset(), 100L);
+    }
+
     private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
                                                                final boolean autoCommit,
                                                                final boolean leaveGroup)
{

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message