Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A738F200D04 for ; Mon, 11 Sep 2017 09:24:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A5CEB1609C4; Mon, 11 Sep 2017 07:24:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9D00D1609C3 for ; Mon, 11 Sep 2017 09:24:07 +0200 (CEST) Received: (qmail 68118 invoked by uid 500); 11 Sep 2017 07:24:06 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 68107 invoked by uid 99); 11 Sep 2017 07:24:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Sep 2017 07:24:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 20D8218391A for ; Mon, 11 Sep 2017 07:24:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id UG6eLpi2sI9R for ; Mon, 11 Sep 2017 07:24:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 31EFD61029 for ; Mon, 11 Sep 2017 07:24:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 15CA6E01D8 for ; Mon, 11 Sep 2017 07:24:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 3E2B92416A for ; Mon, 11 Sep 2017 07:24:00 +0000 (UTC) Date: Mon, 11 Sep 2017 07:24:00 +0000 (UTC) From: "Nandish Kotadia (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (KAFKA-5868) Kafka Consumer Rebalancing takes too long MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 11 Sep 2017 07:24:08 -0000 [ https://issues.apache.org/jira/browse/KAFKA-5868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nandish Kotadia updated KAFKA-5868: ----------------------------------- Description: up vote 0 down vote favorite 1 I have a Kafka Streams Application which takes data from few topics and joins the data and puts it in another topic. *Kafka Configuration: * * 5 kafka brokers * Kafka Topics - 15 partitions and 3 replication factor. Few millions of records are consumed/produced every hour. Whenever I take any kafka broker down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for rebalancing. Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times it throws exception while rebalancing. This is stopping us from going live in production environment with this setup. Any help would be appreciated. _Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ? Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173) at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346) at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118) at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448) at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)_ *Kafka Streams Config: * * bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092 * max.poll.records = 100 * request.timeout.ms=40000 ConsumerConfig it internally creates is: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092] check.crcs = true client.id = conversion-live-StreamThread-1-restore-consumer connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 2147483647 max.poll.records = 100 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer was: up vote 0 down vote favorite 1 I have a Kafka Streams Application which takes data from few topics and joins the data and puts it in another topic. Kafka Configuration: 5 kafka brokers Kafka Topics - 15 partitions and 3 replication factor. Few millions of records are consumed/produced every hour. Whenever I take any kafka broker down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for rebalancing. Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times it throws exception while rebalancing. This is stopping us from going live in production environment with this setup. Any help would be appreciated. Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ? Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173) at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346) at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118) at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448) at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110) Kafka Streams Config: bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092 max.poll.records = 100 request.timeout.ms=40000 ConsumerConfig it internally creates is: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092] check.crcs = true client.id = conversion-live-StreamThread-1-restore-consumer connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 2147483647 max.poll.records = 100 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer > Kafka Consumer Rebalancing takes too long > ----------------------------------------- > > Key: KAFKA-5868 > URL: https://issues.apache.org/jira/browse/KAFKA-5868 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.0 > Reporter: Nandish Kotadia > > up vote > 0 > down vote > favorite > 1 > I have a Kafka Streams Application which takes data from few topics and joins the data and puts it in another topic. > *Kafka Configuration: * > * 5 kafka brokers > * Kafka Topics - 15 partitions and 3 replication factor. > Few millions of records are consumed/produced every hour. Whenever I take any kafka broker down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for rebalancing. > Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times it throws exception while rebalancing. > This is stopping us from going live in production environment with this setup. Any help would be appreciated. > _Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ? > Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725) > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604) > at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173) > at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307) > at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49) > at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) > at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259) > at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362) > at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346) > at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118) > at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448) > at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)_ > *Kafka Streams Config: * > * bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092 > * max.poll.records = 100 > * request.timeout.ms=40000 > ConsumerConfig it internally creates is: > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092] > check.crcs = true > client.id = conversion-live-StreamThread-1-restore-consumer > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = > heartbeat.interval.ms = 3000 > interceptor.classes = null > internal.leave.group.on.close = false > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 100 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 40000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer -- This message was sent by Atlassian JIRA (v6.4.14#64029)