kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andy Coates (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs
Date Tue, 05 Jul 2016 15:04:11 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362604#comment-15362604

Andy Coates commented on KAFKA-3919:

Hi [~junrao], thanks for taking the time to look at this.

First off, we have unclean leadership elections disable. (We did later enable them to help
get around some other issues we were having, but this was several hours after this issue manifested).

We did look through data logs that were causing the brokers to not start. What we found before
the incident was a monotonically increasing offset, where each compressed batch normally contained
one or two records. Then the is a batch that contains many records, whose first records has
an offset below the previous batch and whose last record has an offset above the previous
batch. Following on from this there continues a period of large batches, with monotonically
increasing offsets, and then the log returns to batches with one or two records.

Our working assumption here is that the period before the offset dip is pre-outage normal
operation. The period of larger batches is from just after the outage, where producers have
a back log to processes when the partition becomes available, and then things return to normal
batch sizes again once the back log clears.

We did also look through the Kafka's application logs to try and piece together the series
of events leading up to this:

Here’s what I know happened, with regards to one partition that has issues, from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log segment 1239444214,
completes load with offset 1239740602, and becomes leader of the partition.
2012 comes up next, recovers its log,  recovers unflushed log segment 1239444214, truncates
to offset 1239742830, (thats 2,228 records ahead of the recovered offset of the current leader),
and starts following.
2024 comes up quickly after 2012.  recovers unflushed log segment 1239444214, truncates to
offset  1239742250, (thats 1,648 records ahead of the recovered offset of the current leader),
and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to another partition
having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to another partition
having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of invalid offsets
in the log.

You’ll notice that the offset the brokers truncate to are different for each of the three

Given that I can write to the partition with only one broker available, and that I can then
take this broker down and bring up a different one from the replica set and write to that
one, how does Kafka currently look to reconcile these different histories when the first node
is brought back online?  I know that if the first node has a greater offset it will halt when
it tries to follow the second, but what happens if the first node has a lower offset?

Maybe the above scenario is correctly handled and I’m off down a tangent! (I’d appreciate
any info to improve my understanding of Kafka and help me figure out what is happening here.).
I’m just trying to reconcile the data I’m seeing in the logs and your response to my post.

I’m going to extract the pertinent entries from our app logs, obfuscate and add them in

(I’ll also add some of that I’ve written here to the description above for the benefit
of anyone new to the ticket)



> Broker faills to start after ungraceful shutdown due to non-monotonically incrementing
offsets in logs
> ------------------------------------------------------------------------------------------------------
>                 Key: KAFKA-3919
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3919
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions:
>            Reporter: Andy Coates
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a proportion of our
cluster disappear. When the power came back on several brokers halted on start up with the
> {noformat}
> 	Fatal error during KafkaServerStartable startup. Prepare to shutdown”
> 	kafka.common.InvalidOffsetException: Attempt to append an offset (1239742691) to position
35728 no larger than the last offset appended (1239742822) to /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
> 	at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> 	at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> 	at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> 	at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
> 	at kafka.log.LogSegment.recover(LogSegment.scala:188)
> 	at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
> 	at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 	at kafka.log.Log.loadSegments(Log.scala:160)
> 	at kafka.log.Log.<init>(Log.scala:90)
> 	at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
> 	at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that contained non monotonically
incrementing offsets.
> I’ve spent some time digging through the logs and I feel I may have worked out the
sequence of events leading to this issue, (though this is based on some assumptions I've made
about the way Kafka is working, which may be wrong).
> Given:
> * A topic that is produced to using acks = 1
> * A topic that is produced to using gzip compression
> * A topic that has min.isr set to less than the number of replicas, (i.e. min.isr=2,
> * Following ISRs are lagging behind the leader by some small number of messages, (which
is normal with acks=1)
> * brokers are configured with fairly large zk session timeout e.g. 30s.
> * brokers are configured so that unclean leader elections are disabled.
> Then:
> When something like a power outage take out all three replicas, its possible to get into
a state such that the indexes won’t rebuild on a restart and a broker fails to start. This
can happen when:
> * Enough brokers, but not the pre-outage leader, come on-line for the partition to be
> * Producers produce enough records to the partition that the head offset is now greater
than the pre-outage leader head offset.
> * The pre-outage leader comes back online.
> At this point the logs on the pre-outage leader have diverged from the other replicas.
 It has some messages that are not in the other replicas, and the other replicas have some
records not in the pre-outage leader's log - at the same offsets.
> I’m assuming that because the current leader has at higher offset than the pre-outage
leader, the pre-outage leader just starts following the leader and requesting the records
it thinks its missing.
> I’m also assuming that because the producers were using gzip, so each record is actual
a compressed message set, that iwhen the pre-outage leader requests records from the leader,
the offset it requests could just happened to be in the middle of a compressed batch, but
the leader returns the full batch.  When the pre-outage leader appends this batch to its own
log it thinks all is OK. But what has happened is that the offsets in the log are no longer
monotonically incrementing. Instead they actually dip by the number of records in the compressed
batch that were before the requested offset.  If and when this broker restarts this dip may
be at the 4K boundary the indexer checks. If it is, the broker won’t start.
> Several of our brokers were unlucky enough to hit that 4K boundary, causing a protracted
outage.  We’ve written a little utility that shows several more brokers have a dip outside
of the 4K boundary.
> There are some assumptions in there, which I’ve not got around to confirming / denying.
(A quick attempt to recreate this failed and I've not found the time to invest more).
> Of course I'd really appreciate the community / experts stepping in and commenting on
whether my assumptions are right or wrong, or if there is another explanation to the problem.

> But assuming I’m mostly right, then the fact the broker won’t start is obviously
a bug, and one I’d like to fix.  A Kafka broker should not corrupt its own log during normal
operation to the point that it can’t restart!
> A secondary issue is if we think the divergent logs are acceptable? This may be deemed
acceptable given the producers have chosen availability over consistency when they produced
with acks = 1?  Though personally, the system having diverging replicas of an immutable commit
log just doesn't sit right.
> I see us having a few options here:
> * Have the replicas detect the divergence of their logs e.g. a follower compares the
checksum of its last record with the same offset on the leader. The follower can then workout
that its log has diverged from the leader.  At which point it could either halt, stop replicating
that partition or search backwards to find the point of divergence, truncate and recover.
(possibly saving the truncated part somewhere). This would be a protocol change for Kafka.
 This solution trades availability, (you’ve got less ISRs during the extended re-sync process),
for consistency.
> * Leave the logs as they are and have the indexing of offsets in the log on start up
handle such a situation gracefully.  This leaves logs in a divergent state between replicas,
(meaning replays would yield different messages if the leader was up to down), but gives better
availability, (no time spent not being an ISR while it repairs any divergence).
> * Support multiple options and allow it be tuned, ideally by topic.
> * Something else...
> I’m happy/keen to contribute here. But I’d like to first discuss which option should
be investigated.
> Andy

This message was sent by Atlassian JIRA

View raw message