kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nandish Kotadia (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-5868) Kafka Consumer Rebalancing takes too long
Date Mon, 11 Sep 2017 07:24:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-5868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Nandish Kotadia updated KAFKA-5868:
-----------------------------------
    Description: 
up vote
0
down vote
favorite
1
I have a Kafka Streams Application which takes data from few topics and joins the data and
puts it in another topic.

*Kafka Configuration: *

* 5 kafka brokers
* Kafka Topics - 15 partitions and 3 replication factor.

Few millions of records are consumed/produced every hour. Whenever I take any kafka broker
down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for
rebalancing.

Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times it
throws exception while rebalancing.

This is stopping us from going live in production environment with this setup. Any help would
be appreciated.

_Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions
to another member. This means that the time between subsequent calls to poll() was longer
than the configured max.poll.interval.ms, which typically implies that the poll loop is spending
too much time message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)_


*Kafka Streams Config: *

* bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
* max.poll.records = 100
* request.timeout.ms=40000

ConsumerConfig it internally creates is:

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
    check.crcs = true
    client.id = conversion-live-StreamThread-1-restore-consumer
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 2147483647
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

  was:
up vote
0
down vote
favorite
1
I have a Kafka Streams Application which takes data from few topics and joins the data and
puts it in another topic.

Kafka Configuration:

5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor. 
Few millions of records are consumed/produced every hour. Whenever I take any kafka broker
down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for
rebalancing.

Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times it
throws exception while rebalancing.

This is stopping us from going live in production environment with this setup. Any help would
be appreciated.

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions
to another member. This means that the time between subsequent calls to poll() was longer
than the configured max.poll.interval.ms, which typically implies that the poll loop is spending
too much time message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)


Kafka Streams Config:

bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000

ConsumerConfig it internally creates is:

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
    check.crcs = true
    client.id = conversion-live-StreamThread-1-restore-consumer
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 2147483647
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer


> Kafka Consumer Rebalancing takes too long
> -----------------------------------------
>
>                 Key: KAFKA-5868
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5868
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0
>            Reporter: Nandish Kotadia
>
> up vote
> 0
> down vote
> favorite
> 1
> I have a Kafka Streams Application which takes data from few topics and joins the data
and puts it in another topic.
> *Kafka Configuration: *
> * 5 kafka brokers
> * Kafka Topics - 15 partitions and 3 replication factor.
> Few millions of records are consumed/produced every hour. Whenever I take any kafka broker
down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for
rebalancing.
> Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times
it throws exception while rebalancing.
> This is stopping us from going live in production environment with this setup. Any help
would be appreciated.
> _Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
> Commit cannot be completed since the group has already rebalanced and assigned the partitions
to another member. This means that the time between subsequent calls to poll() was longer
than the configured max.poll.interval.ms, which typically implies that the poll loop is spending
too much time message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with max.poll.records.
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
> at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
> at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
> at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
> at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
> at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
> at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)_
> *Kafka Streams Config: *
> * bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
> * max.poll.records = 100
> * request.timeout.ms=40000
> ConsumerConfig it internally creates is:
>     auto.commit.interval.ms = 5000
>     auto.offset.reset = earliest
>     bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
>     check.crcs = true
>     client.id = conversion-live-StreamThread-1-restore-consumer
>     connections.max.idle.ms = 540000
>     enable.auto.commit = false
>     exclude.internal.topics = true
>     fetch.max.bytes = 52428800
>     fetch.max.wait.ms = 500
>     fetch.min.bytes = 1
>     group.id = 
>     heartbeat.interval.ms = 3000
>     interceptor.classes = null
>     internal.leave.group.on.close = false
>     isolation.level = read_uncommitted
>     key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>     max.partition.fetch.bytes = 1048576
>     max.poll.interval.ms = 2147483647
>     max.poll.records = 100
>     metadata.max.age.ms = 300000
>     metric.reporters = []
>     metrics.num.samples = 2
>     metrics.recording.level = INFO
>     metrics.sample.window.ms = 30000
>     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
>     receive.buffer.bytes = 65536
>     reconnect.backoff.max.ms = 1000
>     reconnect.backoff.ms = 50
>     request.timeout.ms = 40000
>     retry.backoff.ms = 100
>     sasl.jaas.config = null
>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>     sasl.kerberos.min.time.before.relogin = 60000
>     sasl.kerberos.service.name = null
>     sasl.kerberos.ticket.renew.jitter = 0.05
>     sasl.kerberos.ticket.renew.window.factor = 0.8
>     sasl.mechanism = GSSAPI
>     security.protocol = PLAINTEXT
>     send.buffer.bytes = 131072
>     session.timeout.ms = 10000
>     ssl.cipher.suites = null
>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>     ssl.endpoint.identification.algorithm = null
>     ssl.key.password = null
>     ssl.keymanager.algorithm = SunX509
>     ssl.keystore.location = null
>     ssl.keystore.password = null
>     ssl.keystore.type = JKS
>     ssl.protocol = TLS
>     ssl.provider = null
>     ssl.secure.random.implementation = null
>     ssl.trustmanager.algorithm = PKIX
>     ssl.truststore.location = null
>     ssl.truststore.password = null
>     ssl.truststore.type = JKS
>     value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message