kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jun Rao (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs
Date Wed, 06 Jul 2016 20:43:11 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365046#comment-15365046

Jun Rao commented on KAFKA-3919:

[~BigAndy], yes, you are right. We actually do index based on the first offset in a compressed
messageSet during recovery. So, the hypothesis can indeed happen. It seems that you lost multiple
brokers in a hard way at the same time, was that due to a power outage?

To fix this properly, we probably want to fix KAFKA-1211. This is a bit involved since we
need to keep track of the leader generations in the log. However, if we can do that, if the
same situation occurs when a follower wants to fetch from an offset that has been overwritten
with new messages in the leader, the follower would know those messages are from a newer generation
of the leader and will truncate its local log to a correct offset.

Another thing that's a bit weird right now is that in the leader, we index based on the first
offset in a compressed message set. But in the follower, we index based on the last offset
in a compressed message set. This is mostly to avoid decompression on the follower side since
we only store the last offset in the wrapper message. Indexing either first offset or last
offset is semantically correct, but it would be good to make this consistent. Next time when
we evolve the message format, we probably want to consider adding the first offset in the
wrapper message.

> Broker faills to start after ungraceful shutdown due to non-monotonically incrementing
offsets in logs
> ------------------------------------------------------------------------------------------------------
>                 Key: KAFKA-3919
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3919
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions:
>            Reporter: Andy Coates
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a proportion of our
cluster disappear. When the power came back on several brokers halted on start up with the
> {noformat}
> 	Fatal error during KafkaServerStartable startup. Prepare to shutdown”
> 	kafka.common.InvalidOffsetException: Attempt to append an offset (1239742691) to position
35728 no larger than the last offset appended (1239742822) to /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
> 	at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> 	at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> 	at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> 	at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
> 	at kafka.log.LogSegment.recover(LogSegment.scala:188)
> 	at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
> 	at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 	at kafka.log.Log.loadSegments(Log.scala:160)
> 	at kafka.log.Log.<init>(Log.scala:90)
> 	at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
> 	at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that contained non monotonically
incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked out the sequence
of events leading to this issue, (though this is based on some assumptions I've made about
the way Kafka is working, which may be wrong).
> First off, we have unclean leadership elections disable. (We did later enable them to
help get around some other issues we were having, but this was several hours after this issue
manifested), and we're producing to the topic with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. What we found
the initial part of the log has monotonically increasing offset, where each compressed batch
normally contained one or two records. Then the is a batch that contains many records, whose
first records have an offset below the previous batch and whose last record has an offset
above the previous batch. Following on from this there continues a period of large batches,
with monotonically increasing offsets, and then the log returns to batches with one or two
> Our working assumption here is that the period before the offset dip, with the small
batches, is pre-outage normal operation. The period of larger batches is from just after the
outage, where producers have a back log to processes when the partition becomes available,
and then things return to normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece together the series
of events leading up to this. Here’s what we know happened, with regards to one partition
that has issues, from the logs:
> Prior to outage:
> * Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the preferred
> * Producers using acks=1, compression=gzip
> * Brokers configured with unclean.elections=false, zk.session-timeout=36s
> Post outage:
> * 2011 comes up first, (also as the Controller), recovers unflushed log segment 1239444214,
completes load with offset 1239740602, and becomes leader of the partition.
> * 2012 comes up next, recovers its log,  recovers unflushed log segment 1239444214, truncates
to offset 1239742830, (thats 2,228 records ahead of the recovered offset of the current leader),
and starts following.
> * 2024 comes up quickly after 2012.  recovers unflushed log segment 1239444214, truncates
to offset  1239742250, (thats 1,648 records ahead of the recovered offset of the current leader),
and starts following.
> * The Controller adds 2024 to the replica set just before 2024 halts due to another partition
having an offset greater than the leader.
> * The Controller adds 2012 to the replica set just before 2012 halts due to another partition
having an offset greater than the leader.
> * When 2012 is next restarted, it fails to fully start as its complaining of invalid
offsets in the log.
> You’ll notice that the offset the brokers truncate to are different for each of the
three brokers. 
> We're assuming that when the 2012 starts up and follows the leader it request records
from its truncated offsets, but that the logs have diverged on these two brokers to the point
that the requested offset corresponds within the leader's log to the middle of a compressed
record set, not at a record set boundary.  The leader then returns the whole compressed set,
which the follower appends to its log - unknowingly introducing a dip in its otherwise monotonically
incrementing offsets.
> Several of our brokers were unlucky enough to have this dip at the 4K boundary used by
the offset indexer, causing a protracted outage.  We’ve written a little utility that shows
several more brokers have a dip outside of the 4K boundary.
> There are some assumptions in there, which I’ve not got around to confirming / denying.
(A quick attempt to recreate this failed and I've not found the time to invest more).
> Of course I'd really appreciate the community / experts stepping in and commenting on
whether our assumptions are right or wrong, or if there is another explanation to the problem.

> Obviously, the fact the broker got into this state and then won’t start is obviously
a bug, and one I’d like to fix.  A Kafka broker should not corrupt its own log during normal
operation to the point that it can’t restart! :D
> A secondary issue is if we think the divergent logs are acceptable? This may be deemed
acceptable given the producers have chosen availability over consistency when they produced
with acks = 1?  Though personally, the system having diverging replicas of an immutable commit
log just doesn't sit right.
> I see us having a few options here:
> * Have the replicas detect the divergence of their logs e.g. a follower compares the
checksum of its last record with the same offset on the leader. The follower can then workout
that its log has diverged from the leader.  At which point it could either halt, stop replicating
that partition or search backwards to find the point of divergence, truncate and recover.
(possibly saving the truncated part somewhere). This would be a protocol change for Kafka.
 This solution trades availability, (you’ve got less ISRs during the extended re-sync process),
for consistency.
> * Leave the logs as they are and have the indexing of offsets in the log on start up
handle such a situation gracefully.  This leaves logs in a divergent state between replicas,
(meaning replays would yield different messages if the leader was up to down), but gives better
availability, (no time spent not being an ISR while it repairs any divergence).
> * Support multiple options and allow it be tuned, ideally by topic.
> * Something else...
> I’m happy/keen to contribute here. But I’d like to first discuss which option should
be investigated.
> Andy

This message was sent by Atlassian JIRA

View raw message