kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ben Isaacs (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
Date Thu, 27 Sep 2018 11:34:00 GMT
Ben Isaacs created KAFKA-7447:
---------------------------------

             Summary: Consumer offsets lost during leadership rebalance after bringing node
back from clean shutdown
                 Key: KAFKA-7447
                 URL: https://issues.apache.org/jira/browse/KAFKA-7447
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.0.0, 1.1.1
            Reporter: Ben Isaacs


*Summary:*
 * When 1 of my 3 brokers is cleanly shut down, consumption and production continues as normal
due to replication. (Consumers are rebalanced to the replicas, and producers are rebalanced
to the remaining brokers). However, when the cleanly-shut-down broker comes back, after
about 10 minutes, a flurry of production errors occur and my consumers suddenly go back in
time 2 weeks, causing a long outage (12 hours+) as all messages are replayed on some topics.
 * The hypothesis is that the auto-leadership-rebalance is happening too quickly after the
downed broker returns, before it has had a chance to become fully synchronised on all partitions.
In particular, it seems that having consumer offets ahead of the most recent data on the
topic that consumer was following causes the consumer to be reset to 0.

*Expected:*
 * bringing a node back from a clean shut down does not cause any consumers to reset to 0.

*Actual:*
 * I experience approximately 12 hours of partial outage triggered at the point that auto
leadership rebalance occurs, after a cleanly shut down node returns.

*Workaround:*
 * disable auto leadership rebalance entirely. 
 * manually rebalance it from time to time when all nodes and all partitions are fully replicated.

*My Setup:*
 * Kafka deployment with 3 brokers and 2 topics.
 * Replication factor is 3, for all topics.
 * min.isr is 2, for all topics.
 * Zookeeper deployment with 3 instances.
 * In the region of 10 to 15 consumers, with 2 user topics (and, of course, the system
topics such as consumer offsets). Consumer offsets has the standard 50 partitions. The user
topics have about 3000 partitions in total.
 * Offset retention time of 7 days, and topic retention time of 14 days.
 * Input rate ~1000 messages/sec.
 * Deployment happens to be on Google compute engine.

*Related Stack Overflow Post:*[

https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker]

It was suggested I open a ticket by "Muir" who says he they have also experienced this.

*Transcription of logs, showing the problem:*

Below, you can see chronologically sorted, interleaved, logs from the 3 brokers. prod-kafka-2
is the node which was cleanly shut down and then restarted. I filtered the messages only
to those regardling __consumer_offsets-29 because it's just too much to paste, otherwise.


||Broker host||Broker ID||
|prod-kafka-1|0|
|prod-kafka-2|1 (this one was restarted)|
|prod-kafka-3|2|

 

 

prod-kafka-2: (just starting up)
{quote}{{[2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0]
Based on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29.
The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)}}
{quote}
prod-kafka-3: (sees replica1 come back)
{quote}{{[2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] Expanding
ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)}}
{quote}
prod-kafka-2:
{quote}{{[2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling unloading
of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)}}
{{ [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished unloading __consumer_offsets-29.
Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)}}
{{ [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for
partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)}}
{{ [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] __consumer_offsets-29
starts at Leader Epoch 78 from offset 0. Previous Leader Epoch was: 77 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling loading of
offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)}}
{{ [2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished loading offsets
and group metadata from __consumer_offsets-29 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)}}
{quote}
prod-kafka-3: struggling to agree with prod-kafka-2. Kicks it out of ISR, but then fights
with ZooKeeper. Perhaps 2 and 3 both think they're leader?
{quote}{{[2018-09-17 09:24:15,372] INFO [Partition __consumer_offsets-29 broker=2] Shrinking
ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:15,377] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{quote}
prod-kafka-2: rudely kicks BOTH of the other two replicas out of the ISR list, even though
2 is the one we just restarted and therefore is most likely behind. (Bear in mind that it
already decided to truncate the topic to 0!)
{quote}{{[2018-09-17 09:24:16,481] INFO [Partition __consumer_offsets-29 broker=1] Shrinking
ISR from 0,2,1 to 1 (kafka.cluster.Partition)}}
{quote}
prod-kafka-3: still fighting with zookeeper. Eventually loses.
{quote}{{[2018-09-17 09:24:20,374] INFO [Partition __consumer_offsets-29 broker=2] Shrinking
ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:20,378] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:25,347] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:25,350] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:30,359] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:30,362] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:35,365] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:35,368] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:40,352] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:40,354] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:45,422] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:45,425] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:50,345] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:50,348] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:55,444] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:24:55,449] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:00,340] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:00,343] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:05,374] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:05,377] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:10,342] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:10,344] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:15,348] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:15,351] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:20,338] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:20,340] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:25,338] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:25,340] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:30,382] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:30,387] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:35,341] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:35,344] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:40,460] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:40,465] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:45,335] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR
from 0,2,1 to 0,2 (kafka.cluster.Partition)}}
{{ [2018-09-17 09:25:45,338] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion
[1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)}}
{quote}

prod-kafka-1: suddenly gets confused and also re-inits to 0, as prod-kafka-2 apparently becomes
leader.
{quote}{{[2018-09-17 09:25:48,807] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0]
Remote broker is not the leader for partition __consumer_offsets-29, which could indicate
that the partition is being moved (kafka.server.ReplicaFetcherThread)}}
{quote}

prod-kafka-3: finally decides that prod-kafka-2 is in charge, truncates accordingly
{quote}{{[2018-09-17 09:25:48,806] INFO [ReplicaFetcherManager on broker 2] Removed fetcher
for partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)}}
{{ [2018-09-17 09:25:48,807] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions
List([__consumer_offsets-29, initOffset 0 to broker BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)]
) (kafka.server.ReplicaFetcherManager)}}
{{ [2018-09-17 09:25:48,809] INFO [GroupMetadataManager brokerId=2] Scheduling unloading of
offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)}}
{{ [2018-09-17 09:25:48,810] INFO [GroupMetadataManager brokerId=2] Finished unloading __consumer_offsets-29.
Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)}}
{{ [2018-09-17 09:25:48,950] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Based
on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29.
The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)}}
{{ [2018-09-17 09:25:48,951] INFO [Log partition=__consumer_offsets-29, dir=/var/lib/kafka/data]
Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)}}
{quote}

prod-kafka-1: leadership inauguration confirmed.
{quote}{{[2018-09-17 09:25:50,207] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0]
Remote broker is not the leader for partition __consumer_offsets-29, which could indicate
that the partition is being moved (kafka.server.ReplicaFetcherThread)}}
{quote}
prod-kafka-2: now that it has asserted its dominance via zookeeper, prod-kafka-3 added to
the ISR list
{quote}{{[2018-09-17 09:25:50,210] INFO [Partition __consumer_offsets-29 broker=1] Expanding
ISR from 1 to 1,2 (kafka.cluster.Partition)}}
{quote}
prod-kafka-1: still struggling to accept reality, but eventually also truncates to 0.
{quote}{{[2018-09-17 09:25:51,430] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0]
Remote broker is not the leader for partition __consumer_offsets-29, which could indicate
that the partition is being moved (kafka.server.ReplicaFetcherThread)}}
{{ [2018-09-17 09:25:52,615] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote
broker is not the leader for partition __consumer_offsets-29, which could indicate that the
partition is being moved (kafka.server.ReplicaFetcherThread)}}
{{ [2018-09-17 09:25:53,637] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote
broker is not the leader for partition __consumer_offsets-29, which could indicate that the
partition is being moved (kafka.server.ReplicaFetcherThread)}}
{{ [2018-09-17 09:25:54,150] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for
partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)}}
{{ [2018-09-17 09:25:54,151] INFO [ReplicaFetcherManager on broker 0] Added fetcher for partitions
List([__consumer_offsets-29, initOffset 0 to broker BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)]
) (kafka.server.ReplicaFetcherManager)}}
{{ [2018-09-17 09:25:54,151] INFO [GroupMetadataManager brokerId=0] Scheduling unloading of
offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)}}
{{ [2018-09-17 09:25:54,153] INFO [GroupMetadataManager brokerId=0] Finished unloading __consumer_offsets-29.
Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)}}
{{ [2018-09-17 09:25:54,261] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Based
on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29.
The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)}}
{{ [2018-09-17 09:25:54,261] INFO [Log partition=__consumer_offsets-29, dir=/var/lib/kafka/data]
Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)}}
{quote}
prod-kafka-2: completes its coup of consumer offsets, all is now 0.
{quote}{{[2018-09-17 09:25:56,244] INFO [Partition __consumer_offsets-29 broker=1] Expanding
ISR from 1,2 to 1,2,0 (kafka.cluster.Partition)}}
{quote}
 

Retrospectively, I realise that I have omitted any logs to do with leadership rebalancing.
Nevertheless, as metioned before, it's consistently reproducible, and it's also easy to workaround
by disabling leadership rebalance entirely.

 

*Configuration:*


*kafka server.properties file:*

{{broker.id=1}}
{{ default.replication.factor=3}}
{{ auto.create.topics.enable=false}}
{{ min.insync.replicas=2}}
{{ num.network.threads=12}}
{{ num.io.threads=16}}
{{ num.replica.fetchers=6}}
{{ socket.send.buffer.bytes=102400}}
{{ socket.receive.buffer.bytes=102400}}
{{ socket.request.max.bytes=104857600}}
{{ log.dirs=/var/lib/kafka/data}}
{{ num.partitions=1}}
{{ num.recovery.threads.per.data.dir=4}}
{{ offsets.retention.minutes=10080}}
{{ offsets.topic.replication.factor=3}}
{{ transaction.state.log.replication.factor=3}}
{{ transaction.state.log.min.isr=2}}
{{ log.flush.interval.messages=20000}}
{{ log.flush.interval.ms=10000}}
{{ log.retention.hours=168}}
{{ log.segment.bytes=1073741824}}
{{ log.retention.check.interval.ms=60000}}
{{ zookeeper.connect=prod-kafka-1:2181,prod-kafka-2:2181,prod-kafka-3:2181}}
{{ zookeeper.connection.timeout.ms=6000}}
{{ confluent.support.metrics.enable=false}}
{{ confluent.support.customer.id=anonymous}}
{{ group.initial.rebalance.delay.ms=3000}}

 

*{{zookeeper.properties file:}}*

{{tickTime=2000}}
{{ initLimit=10}}
{{ syncLimit=5}}
{{ dataDir=/var/lib/zookeeper}}
{{ clientPort=2181}}
{{ server.1=prod-kafka-1:2888:3888}}
{{ server.2=prod-kafka-2:2888:3888}}
{{ server.3=prod-kafka-3:2888:3888}}
{{ autopurge.purgeInterval=12}}
{{ autopurge.snapRetainCount=6}}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message