kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5868) Kafka Consumer Rebalancing takes too long
Date Fri, 08 Dec 2017 17:12:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16283857#comment-16283857
] 

Guozhang Wang commented on KAFKA-5868:
--------------------------------------

[~nandishkotadia] Have you tried the newer version {{1.0.0}} and see if this issue goes away?
Note that you can code your app with 1.0.0 client that talks to older versioned brokers.

> Kafka Consumer Rebalancing takes too long
> -----------------------------------------
>
>                 Key: KAFKA-5868
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5868
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
>            Reporter: Nandish Kotadia
>
> up vote
> 0
> down vote
> favorite
> 1
> I have a Kafka Streams Application which takes data from few topics and joins the data
and puts it in another topic.
> *Kafka Configuration: *
> * 5 kafka brokers
> * Kafka Topics - 15 partitions and 3 replication factor.
> Few millions of records are consumed/produced every hour. Whenever I take any kafka broker
down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for
rebalancing. Also it kills many of my Kafka Streams processes.
> *Note: My Kafka Streams processes are running on the same machine as of Kafka Broker.*
> Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times
it throws exception while rebalancing.
> This is stopping us from going live in production environment with this setup. Any help
would be appreciated.
> _Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
> Commit cannot be completed since the group has already rebalanced and assigned the partitions
to another member. This means that the time between subsequent calls to poll() was longer
than the configured max.poll.interval.ms, which typically implies that the poll loop is spending
too much time message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with max.poll.records.
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
> at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
> at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
> at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
> at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
> at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
> at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)_
> *Kafka Streams Config: *
> * bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
> * max.poll.records = 100
> * request.timeout.ms=40000
> ConsumerConfig it internally creates is:
>     auto.commit.interval.ms = 5000
>     auto.offset.reset = earliest
>     bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
>     check.crcs = true
>     client.id = conversion-live-StreamThread-1-restore-consumer
>     connections.max.idle.ms = 540000
>     enable.auto.commit = false
>     exclude.internal.topics = true
>     fetch.max.bytes = 52428800
>     fetch.max.wait.ms = 500
>     fetch.min.bytes = 1
>     group.id = 
>     heartbeat.interval.ms = 3000
>     interceptor.classes = null
>     internal.leave.group.on.close = false
>     isolation.level = read_uncommitted
>     key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>     max.partition.fetch.bytes = 1048576
>     max.poll.interval.ms = 2147483647
>     max.poll.records = 100
>     metadata.max.age.ms = 300000
>     metric.reporters = []
>     metrics.num.samples = 2
>     metrics.recording.level = INFO
>     metrics.sample.window.ms = 30000
>     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
>     receive.buffer.bytes = 65536
>     reconnect.backoff.max.ms = 1000
>     reconnect.backoff.ms = 50
>     request.timeout.ms = 40000
>     retry.backoff.ms = 100
>     sasl.jaas.config = null
>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>     sasl.kerberos.min.time.before.relogin = 60000
>     sasl.kerberos.service.name = null
>     sasl.kerberos.ticket.renew.jitter = 0.05
>     sasl.kerberos.ticket.renew.window.factor = 0.8
>     sasl.mechanism = GSSAPI
>     security.protocol = PLAINTEXT
>     send.buffer.bytes = 131072
>     session.timeout.ms = 10000
>     ssl.cipher.suites = null
>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>     ssl.endpoint.identification.algorithm = null
>     ssl.key.password = null
>     ssl.keymanager.algorithm = SunX509
>     ssl.keystore.location = null
>     ssl.keystore.password = null
>     ssl.keystore.type = JKS
>     ssl.protocol = TLS
>     ssl.provider = null
>     ssl.secure.random.implementation = null
>     ssl.trustmanager.algorithm = PKIX
>     ssl.truststore.location = null
>     ssl.truststore.password = null
>     ssl.truststore.type = JKS
>     value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message