Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6418B200B35 for ; Tue, 5 Jul 2016 17:04:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 62A5C160A2C; Tue, 5 Jul 2016 15:04:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5BE31160A60 for ; Tue, 5 Jul 2016 17:04:12 +0200 (CEST) Received: (qmail 15113 invoked by uid 500); 5 Jul 2016 15:04:11 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 15099 invoked by uid 99); 5 Jul 2016 15:04:11 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 15:04:11 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 28CF82C02A7 for ; Tue, 5 Jul 2016 15:04:11 +0000 (UTC) Date: Tue, 5 Jul 2016 15:04:11 +0000 (UTC) From: "Andy Coates (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 05 Jul 2016 15:04:13 -0000 [ https://issues.apache.org/jira/browse/KAFKA-3919?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1536= 2604#comment-15362604 ]=20 Andy Coates commented on KAFKA-3919: ------------------------------------ Hi [~junrao], thanks for taking the time to look at this. First off, we have unclean leadership elections disable. (We did later enab= le them to help get around some other issues we were having, but this was s= everal hours after this issue manifested). We did look through data logs that were causing the brokers to not start. W= hat we found before the incident was a monotonically increasing offset, whe= re each compressed batch normally contained one or two records. Then the is= a batch that contains many records, whose first records has an offset belo= w the previous batch and whose last record has an offset above the previous= batch. Following on from this there continues a period of large batches, w= ith monotonically increasing offsets, and then the log returns to batches w= ith one or two records. Our working assumption here is that the period before the offset dip is pre= -outage normal operation. The period of larger batches is from just after t= he outage, where producers have a back log to processes when the partition = becomes available, and then things return to normal batch sizes again once = the back log clears. We did also look through the Kafka's application logs to try and piece toge= ther the series of events leading up to this: Here=E2=80=99s what I know happened, with regards to one partition that has= issues, from the logs: Prior to outage: Replicas for the partition are brokers 2011, 2012, 2024, with 2024 being t= he preferred leader. Producers using acks=3D1, compression=3Dgzip Brokers configured with unclean.elections=3Dfalse, zk.session-timeout=3D36s Post outage: 2011 comes up first, (also as the Controller), recovers unflushed log segme= nt 1239444214, completes load with offset 1239740602, and becomes leader of= the partition. 2012 comes up next, recovers its log, recovers unflushed log segment 12394= 44214, truncates to offset 1239742830, (thats 2,228 records ahead of the re= covered offset of the current leader), and starts following. 2024 comes up quickly after 2012. recovers unflushed log segment 123944421= 4, truncates to offset 1239742250, (thats 1,648 records ahead of the recov= ered offset of the current leader), and starts following. The Controller adds 2024 to the replica set just before 2024 halts due to a= nother partition having an offset greater than the leader. The Controller adds 2012 to the replica set just before 2012 halts due to a= nother partition having an offset greater than the leader. When 2012 is next restarted, it fails to fully start as its complaining of = invalid offsets in the log. You=E2=80=99ll notice that the offset the brokers truncate to are different= for each of the three brokers.=20 Given that I can write to the partition with only one broker available, and= that I can then take this broker down and bring up a different one from th= e replica set and write to that one, how does Kafka currently look to recon= cile these different histories when the first node is brought back online? = I know that if the first node has a greater offset it will halt when it tr= ies to follow the second, but what happens if the first node has a lower of= fset? Maybe the above scenario is correctly handled and I=E2=80=99m off down a ta= ngent! (I=E2=80=99d appreciate any info to improve my understanding of Kafk= a and help me figure out what is happening here.). I=E2=80=99m just trying = to reconcile the data I=E2=80=99m seeing in the logs and your response to m= y post. I=E2=80=99m going to extract the pertinent entries from our app logs, obfus= cate and add them in here. (I=E2=80=99ll also add some of that I=E2=80=99ve written here to the descri= ption above for the benefit of anyone new to the ticket) Thanks, Andy > Broker faills to start after ungraceful shutdown due to non-monotonically= incrementing offsets in logs > -------------------------------------------------------------------------= ----------------------------- > > Key: KAFKA-3919 > URL: https://issues.apache.org/jira/browse/KAFKA-3919 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.9.0.1 > Reporter: Andy Coates > > Hi All, > I encountered an issue with Kafka following a power outage that saw a pro= portion of our cluster disappear. When the power came back on several broke= rs halted on start up with the error: > {noformat} > =09Fatal error during KafkaServerStartable startup. Prepare to shutdown= =E2=80=9D > =09kafka.common.InvalidOffsetException: Attempt to append an offset (1239= 742691) to position 35728 no larger than the last offset appended (12397428= 22) to /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.i= ndex. > =09at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.sc= ala:207) > =09at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197= ) > =09at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197= ) > =09at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > =09at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > =09at kafka.log.LogSegment.recover(LogSegment.scala:188) > =09at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > =09at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > =09at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.appl= y(TraversableLike.scala:772) > =09at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimi= zed.scala:33) > =09at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > =09at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike= .scala:771) > =09at kafka.log.Log.loadSegments(Log.scala:160) > =09at kafka.log.Log.(Log.scala:90) > =09at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$= 10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > =09at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > =09at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:= 511) > =09at java.util.concurrent.FutureTask.run(FutureTask.java:266) > =09at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecuto= r.java:1142) > =09at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecut= or.java:617) > =09at java.lang.Thread.run(Thread.java:745) > {noformat} > The only way to recover the brokers was to delete the log files that cont= ained non monotonically incrementing offsets. > I=E2=80=99ve spent some time digging through the logs and I feel I may ha= ve worked out the sequence of events leading to this issue, (though this is= based on some assumptions I've made about the way Kafka is working, which = may be wrong). > Given: > * A topic that is produced to using acks =3D 1 > * A topic that is produced to using gzip compression > * A topic that has min.isr set to less than the number of replicas, (i.e.= min.isr=3D2, #replicas=3D3) > * Following ISRs are lagging behind the leader by some small number of me= ssages, (which is normal with acks=3D1) > * brokers are configured with fairly large zk session timeout e.g. 30s. > * brokers are configured so that unclean leader elections are disabled. > Then: > When something like a power outage take out all three replicas, its possi= ble to get into a state such that the indexes won=E2=80=99t rebuild on a re= start and a broker fails to start. This can happen when: > * Enough brokers, but not the pre-outage leader, come on-line for the par= tition to be writeable > * Producers produce enough records to the partition that the head offset = is now greater than the pre-outage leader head offset. > * The pre-outage leader comes back online. > At this point the logs on the pre-outage leader have diverged from the ot= her replicas. It has some messages that are not in the other replicas, and= the other replicas have some records not in the pre-outage leader's log - = at the same offsets. > I=E2=80=99m assuming that because the current leader has at higher offset= than the pre-outage leader, the pre-outage leader just starts following th= e leader and requesting the records it thinks its missing. > I=E2=80=99m also assuming that because the producers were using gzip, so = each record is actual a compressed message set, that iwhen the pre-outage l= eader requests records from the leader, the offset it requests could just h= appened to be in the middle of a compressed batch, but the leader returns t= he full batch. When the pre-outage leader appends this batch to its own lo= g it thinks all is OK. But what has happened is that the offsets in the log= are no longer monotonically incrementing. Instead they actually dip by the= number of records in the compressed batch that were before the requested o= ffset. If and when this broker restarts this dip may be at the 4K boundary= the indexer checks. If it is, the broker won=E2=80=99t start. > Several of our brokers were unlucky enough to hit that 4K boundary, causi= ng a protracted outage. We=E2=80=99ve written a little utility that shows = several more brokers have a dip outside of the 4K boundary. > There are some assumptions in there, which I=E2=80=99ve not got around to= confirming / denying. (A quick attempt to recreate this failed and I've no= t found the time to invest more). > Of course I'd really appreciate the community / experts stepping in and c= ommenting on whether my assumptions are right or wrong, or if there is anot= her explanation to the problem.=20 > But assuming I=E2=80=99m mostly right, then the fact the broker won=E2=80= =99t start is obviously a bug, and one I=E2=80=99d like to fix. A Kafka br= oker should not corrupt its own log during normal operation to the point th= at it can=E2=80=99t restart! > A secondary issue is if we think the divergent logs are acceptable? This = may be deemed acceptable given the producers have chosen availability over = consistency when they produced with acks =3D 1? Though personally, the sys= tem having diverging replicas of an immutable commit log just doesn't sit r= ight. > I see us having a few options here: > * Have the replicas detect the divergence of their logs e.g. a follower c= ompares the checksum of its last record with the same offset on the leader.= The follower can then workout that its log has diverged from the leader. = At which point it could either halt, stop replicating that partition or sea= rch backwards to find the point of divergence, truncate and recover. (possi= bly saving the truncated part somewhere). This would be a protocol change f= or Kafka. This solution trades availability, (you=E2=80=99ve got less ISRs= during the extended re-sync process), for consistency. > * Leave the logs as they are and have the indexing of offsets in the log = on start up handle such a situation gracefully. This leaves logs in a dive= rgent state between replicas, (meaning replays would yield different messag= es if the leader was up to down), but gives better availability, (no time s= pent not being an ISR while it repairs any divergence). > * Support multiple options and allow it be tuned, ideally by topic. > * Something else... > I=E2=80=99m happy/keen to contribute here. But I=E2=80=99d like to first = discuss which option should be investigated. > Andy -- This message was sent by Atlassian JIRA (v6.3.4#6332)