kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "huxihx (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.
Date Fri, 02 Feb 2018 01:00:55 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349609#comment-16349609
] 

huxihx commented on KAFKA-6362:
-------------------------------

[~hachikuji] I reproduced the issue on my local test environment where standalone consumer
was used. Seems there is no code for standalone consumers to rediscover the coordinator even
after it came back to work.

> auto commit not work since coordinatorUnknown() is always true.
> ---------------------------------------------------------------
>
>                 Key: KAFKA-6362
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6362
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.2.1
>            Reporter: Renkai Ge
>            Assignee: huxihx
>            Priority: Major
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)]
INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
> 	auto.commit.interval.ms = 5000
> 	auto.offset.reset = latest
> 	bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 11.192.73.66:3002]
> 	check.crcs = true
> 	client.id = 
> 	connections.max.idle.ms = 540000
> 	enable.auto.commit = true
> 	exclude.internal.topics = true
> 	fetch.max.bytes = 52428800
> 	fetch.max.wait.ms = 500
> 	fetch.min.bytes = 1
> 	group.id = tcprtdetail_flink
> 	heartbeat.interval.ms = 3000
> 	interceptor.classes = null
> 	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> 	max.partition.fetch.bytes = 1048576
> 	max.poll.interval.ms = 300000
> 	max.poll.records = 500
> 	metadata.max.age.ms = 300000
> 	metric.reporters = []
> 	metrics.num.samples = 2
> 	metrics.recording.level = INFO
> 	metrics.sample.window.ms = 30000
> 	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
> 	receive.buffer.bytes = 65536
> 	reconnect.backoff.ms = 50
> 	request.timeout.ms = 305000
> 	retry.backoff.ms = 100
> 	sasl.jaas.config = null
> 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
> 	sasl.kerberos.min.time.before.relogin = 60000
> 	sasl.kerberos.service.name = null
> 	sasl.kerberos.ticket.renew.jitter = 0.05
> 	sasl.kerberos.ticket.renew.window.factor = 0.8
> 	sasl.mechanism = GSSAPI
> 	security.protocol = PLAINTEXT
> 	send.buffer.bytes = 131072
> 	session.timeout.ms = 10000
> 	ssl.cipher.suites = null
> 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> 	ssl.endpoint.identification.algorithm = null
> 	ssl.key.password = null
> 	ssl.keymanager.algorithm = SunX509
> 	ssl.keystore.location = null
> 	ssl.keystore.password = null
> 	ssl.keystore.type = JKS
> 	ssl.protocol = TLS
> 	ssl.provider = null
> 	ssl.secure.random.implementation = null
> 	ssl.trustmanager.algorithm = PKIX
> 	ssl.truststore.location = null
> 	ssl.truststore.password = null
> 	ssl.truststore.type = JKS
> 	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)]
INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
> [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)]
INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
> {code}
> My kafka java client cannot auto commit.After add some debug log,I found that the coordinatorUnknown()
function in [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
always returns true,and nextAutoCommitDeadline just increases infinitly.Should there be a
lookupCoordinator() after line 604 like in [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
I add lookupCoordinator() next to line 604.The consumer can auto commit offset properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message