kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.
Date Thu, 08 Feb 2018 17:50:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357298#comment-16357298
] 

ASF GitHub Bot commented on KAFKA-6362:
---------------------------------------

hachikuji closed pull request #4326: KAFKA-6362: maybeAutoCommitOffsetsAsync should try to
discover coordinator
URL: https://github.com/apache/kafka/pull/4326
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1d84f847cd8..2f7fd58a66f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1058,7 +1058,7 @@ public void assign(Collection<TopicPartition> partitions) {
 
                 // make sure the offsets of topic partitions the consumer is unsubscribing
from
                 // are committed since there will be no following rebalance
-                this.coordinator.maybeAutoCommitOffsetsNow();
+                this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
 
                 log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
                 this.subscriptions.assignFromUser(new HashSet<>(partitions));
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5c1e60eee82..d7c1ce9966e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -528,6 +528,7 @@ public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata>
offs
                 public void onSuccess(Void value) {
                     pendingAsyncCommits.decrementAndGet();
                     doCommitOffsetsAsync(offsets, callback);
+                    client.pollNoWakeup();
                 }
 
                 @Override
@@ -623,20 +624,10 @@ public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata>
offsets,
         return false;
     }
 
-    private void maybeAutoCommitOffsetsAsync(long now) {
-        if (autoCommitEnabled) {
-            if (coordinatorUnknown()) {
-                this.nextAutoCommitDeadline = now + retryBackoffMs;
-            } else if (now >= nextAutoCommitDeadline) {
-                this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
-                doAutoCommitOffsetsAsync();
-            }
-        }
-    }
-
-    public void maybeAutoCommitOffsetsNow() {
-        if (autoCommitEnabled && !coordinatorUnknown())
+    public void maybeAutoCommitOffsetsAsync(long now) {
+        if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
             doAutoCommitOffsetsAsync();
+        }
     }
 
     private void doAutoCommitOffsetsAsync() {
@@ -650,8 +641,11 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception
                     log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets,
exception.getMessage());
                     if (exception instanceof RetriableException)
                         nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs,
nextAutoCommitDeadline);
+                    else
+                        nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
                 } else {
                     log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
+                    nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
                 }
             }
         });
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 76301a71bea..c49339b6525 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1625,6 +1625,25 @@ public void testHeartbeatThreadClose() throws Exception {
             assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
     }
 
+    @Test
+    public void testAutoCommitAfterCoordinatorBackToService() {
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        subscriptions.seek(t1p, 100L);
+
+        coordinator.coordinatorDead();
+        assertTrue(coordinator.coordinatorUnknown());
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+
+        // async commit offset should find coordinator
+        time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does
happen
+        coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+        assertFalse(coordinator.coordinatorUnknown());
+        assertEquals(subscriptions.committed(t1p).offset(), 100L);
+    }
+
     private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
                                                                final boolean autoCommit,
                                                                final boolean leaveGroup)
{


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> auto commit not work since coordinatorUnknown() is always true.
> ---------------------------------------------------------------
>
>                 Key: KAFKA-6362
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6362
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.2.1
>            Reporter: Renkai Ge
>            Assignee: huxihx
>            Priority: Major
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)]
INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
> 	auto.commit.interval.ms = 5000
> 	auto.offset.reset = latest
> 	bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 11.192.73.66:3002]
> 	check.crcs = true
> 	client.id = 
> 	connections.max.idle.ms = 540000
> 	enable.auto.commit = true
> 	exclude.internal.topics = true
> 	fetch.max.bytes = 52428800
> 	fetch.max.wait.ms = 500
> 	fetch.min.bytes = 1
> 	group.id = tcprtdetail_flink
> 	heartbeat.interval.ms = 3000
> 	interceptor.classes = null
> 	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> 	max.partition.fetch.bytes = 1048576
> 	max.poll.interval.ms = 300000
> 	max.poll.records = 500
> 	metadata.max.age.ms = 300000
> 	metric.reporters = []
> 	metrics.num.samples = 2
> 	metrics.recording.level = INFO
> 	metrics.sample.window.ms = 30000
> 	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
> 	receive.buffer.bytes = 65536
> 	reconnect.backoff.ms = 50
> 	request.timeout.ms = 305000
> 	retry.backoff.ms = 100
> 	sasl.jaas.config = null
> 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
> 	sasl.kerberos.min.time.before.relogin = 60000
> 	sasl.kerberos.service.name = null
> 	sasl.kerberos.ticket.renew.jitter = 0.05
> 	sasl.kerberos.ticket.renew.window.factor = 0.8
> 	sasl.mechanism = GSSAPI
> 	security.protocol = PLAINTEXT
> 	send.buffer.bytes = 131072
> 	session.timeout.ms = 10000
> 	ssl.cipher.suites = null
> 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> 	ssl.endpoint.identification.algorithm = null
> 	ssl.key.password = null
> 	ssl.keymanager.algorithm = SunX509
> 	ssl.keystore.location = null
> 	ssl.keystore.password = null
> 	ssl.keystore.type = JKS
> 	ssl.protocol = TLS
> 	ssl.provider = null
> 	ssl.secure.random.implementation = null
> 	ssl.trustmanager.algorithm = PKIX
> 	ssl.truststore.location = null
> 	ssl.truststore.password = null
> 	ssl.truststore.type = JKS
> 	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)]
INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)]
INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that the coordinatorUnknown()
function in [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
always returns true,and nextAutoCommitDeadline just increases infinitly.Should there be a
lookupCoordinator() after line 604 like in [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
I add lookupCoordinator() next to line 604.The consumer can auto commit offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message