Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B3355200B40 for ; Fri, 1 Jul 2016 11:33:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B1CD1160A5D; Fri, 1 Jul 2016 09:33:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8AC2B160A6C for ; Fri, 1 Jul 2016 11:33:12 +0200 (CEST) Received: (qmail 91939 invoked by uid 500); 1 Jul 2016 09:33:11 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 91895 invoked by uid 99); 1 Jul 2016 09:33:11 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jul 2016 09:33:11 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 5C0CC2C027F for ; Fri, 1 Jul 2016 09:33:11 +0000 (UTC) Date: Fri, 1 Jul 2016 09:33:11 +0000 (UTC) From: "Andy Coates (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 01 Jul 2016 09:33:13 -0000 [ https://issues.apache.org/jira/browse/KAFKA-3919?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:all-tabpanel ] Andy Coates updated KAFKA-3919: ------------------------------- Description:=20 Hi All, I encountered an issue with Kafka following a power outage that saw a propo= rtion of our cluster disappear. When the power came back on several brokers= halted on start up with the error: {noformat} =09Fatal error during KafkaServerStartable startup. Prepare to shutdown=E2= =80=9D =09kafka.common.InvalidOffsetException: Attempt to append an offset (123974= 2691) to position 35728 no larger than the last offset appended (1239742822= ) to /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.ind= ex. =09at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scal= a:207) =09at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) =09at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) =09at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) =09at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) =09at kafka.log.LogSegment.recover(LogSegment.scala:188) =09at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) =09at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) =09at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(= TraversableLike.scala:772) =09at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize= d.scala:33) =09at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) =09at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s= cala:771) =09at kafka.log.Log.loadSegments(Log.scala:160) =09at kafka.log.Log.(Log.scala:90) =09at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10= $$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) =09at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) =09at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51= 1) =09at java.util.concurrent.FutureTask.run(FutureTask.java:266) =09at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.= java:1142) =09at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor= .java:617) =09at java.lang.Thread.run(Thread.java:745) {noformat} The only way to recover the brokers was to delete the log files that contai= ned non monotonically incrementing offsets. I=E2=80=99ve spent some time digging through the logs and I feel I may have= worked out the sequence of events leading to this issue, (though this is b= ased on some assumptions I've made about the way Kafka is working, which ma= y be wrong): Given: * A topic that is produced to using acks =3D 1 * A topic that is produced to using gzip compression * A topic that has min.isr set to less than the number of replicas, (i.e. m= in.isr=3D2, #replicas=3D3) * Following ISRs are lagging behind the leader by some small number of mess= ages, (which is normal with acks=3D1) * brokers are configured with fairly large zk session timeout e.g. 30s. * brokers are configured so that unclean leader elections are disabled. Then: When something like a power outage take out all three replicas, its possibl= e to get into a state such that the indexes won=E2=80=99t rebuild on a rest= art and a broker fails to start. This can happen when: * Enough brokers, but not the pre-outage leader, come on-line for the parti= tion to be writeable * Producers produce enough records to the partition that the head offset is= now greater than the pre-outage leader head offset. * The pre-outage leader comes back online. At this point the logs on the pre-outage leader have diverged from the othe= r replicas. It has some messages that are not in the other replicas, and t= he other replicas have some records not in the pre-outage leader's log - at= the same offsets. I=E2=80=99m assuming that because the current leader has at higher offset t= han the pre-outage leader, the pre-outage leader just starts following the = leader and requesting the records it thinks its missing. I=E2=80=99m also assuming that because the producers were using gzip, so ea= ch record is actual a compressed message set, that iwhen the pre-outage lea= der requests records from the leader, the offset it requests could just hap= pened to be in the middle of a compressed batch, but the leader returns the= full batch. When the pre-outage leader appends this batch to its own log = it thinks all is OK. But what has happened is that the offsets in the log a= re no longer monotonically incrementing. Instead they actually dip by the n= umber of records in the compressed batch that were before the requested off= set. If and when this broker restarts this dip may be at the 4K boundary t= he indexer checks. If it is, the broker won=E2=80=99t start. Several of our brokers were unlucky enough to hit that 4K boundary, causing= a protracted outage. We=E2=80=99ve written a little utility that shows se= veral more brokers have a dip outside of the 4K boundary. There are some assumptions in there, which I=E2=80=99ve not got around to c= onfirming / denying. (A quick attempt to recreate this failed and I've not = found the time to invest more). Of course I'd really appreciate the community / experts stepping in and com= menting on whether my assumptions are right or wrong, or if there is anothe= r explanation to the problem.=20 But assuming I=E2=80=99m mostly right, then the fact the broker won=E2=80= =99t start is obviously a bug, and one I=E2=80=99d like to fix. A Kafka br= oker should not corrupt its own log during normal operation to the point th= at it can=E2=80=99t restart! A secondary issue is if we think the divergent logs are acceptable? This ma= y be deemed acceptable given the producers have chosen availability over co= nsistency when they produced with acks =3D 1? Though personally, the syste= m having diverging replicas of an immutable commit log just doesn't sit rig= ht. I see us having a few options here: * Have the replicas detect the divergence of their logs e.g. a follower com= pares the checksum of its last record with the same offset on the leader. T= he follower can then workout that its log has diverged from the leader. At= which point it could either halt, stop replicating that partition or searc= h backwards to find the point of divergence, truncate and recover. (possibl= y saving the truncated part somewhere). This would be a protocol change for= Kafka. This solution trades availability, (you=E2=80=99ve got less ISRs d= uring the extended re-sync process), for consistency. * Leave the logs as they are and have the indexing of offsets in the log on= start up handle such a situation gracefully. This leaves logs in a diverg= ent state between replicas, (meaning replays would yield different messages= if the leader was up to down), but gives better availability, (no time spe= nt not being an ISR while it repairs any divergence). * Support multiple options and allow it be tuned, ideally by topic. * Something else... I=E2=80=99m happy/keen to contribute here. But I=E2=80=99d like to first di= scuss which option should be investigated. Andy was: Hi All, I encountered an issue with Kafka following a power outage that saw a propo= rtion of our cluster disappear. When the power came back on several brokers= halted on start up with the error: {noformat} =09Fatal error during KafkaServerStartable startup. Prepare to shutdown=E2= =80=9D =09kafka.common.InvalidOffsetException: Attempt to append an offset (123974= 2691) to position 35728 no larger than the last offset appended (1239742822= ) to /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.ind= ex. =09at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scal= a:207) =09at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) =09at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) =09at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) =09at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) =09at kafka.log.LogSegment.recover(LogSegment.scala:188) =09at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) =09at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) =09at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(= TraversableLike.scala:772) =09at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize= d.scala:33) =09at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) =09at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s= cala:771) =09at kafka.log.Log.loadSegments(Log.scala:160) =09at kafka.log.Log.(Log.scala:90) =09at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10= $$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) =09at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) =09at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51= 1) =09at java.util.concurrent.FutureTask.run(FutureTask.java:266) =09at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.= java:1142) =09at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor= .java:617) =09at java.lang.Thread.run(Thread.java:745) {noformat} The only way to recover the brokers was to delete the log files that contai= ned non monotonically incrementing offsets. I=E2=80=99ve spent some time digging through the logs and I feel I may have= worked out the sequence of events leading to this issue, (though this is b= ased on some assumptions I've made about the way Kafka is working, which ma= y be wrong): Given: * A topic that is produced to using acks =3D 1 * A topic that is produced to using gzip compression * A topic that has min.isr set to less than the number of replicas, (i.e. m= in.isr=3D2, #replicas=3D3) * Following ISRs are lagging behind the leader by some small number of mess= ages, (which is normal with acks=3D1) * brokers are configured with fairly large zk session timeout e.g. 30s. Then: When something like a power outage take out all three replicas, its possibl= e to get into a state such that the indexes won=E2=80=99t rebuild on a rest= art and a broker fails to start. This can happen when: * Enough brokers, but not the pre-outage leader, come on-line for the parti= tion to be writeable * Producers produce enough records to the partition that the head offset is= now greater than the pre-outage leader head offset. * The pre-outage leader comes back online. At this point the logs on the pre-outage leader have diverged from the othe= r replicas. It has some messages that are not in the other replicas, and t= he other replicas have some records not in the pre-outage leader's log - at= the same offsets. I=E2=80=99m assuming that because the current leader has at higher offset t= han the pre-outage leader, the pre-outage leader just starts following the = leader and requesting the records it thinks its missing. I=E2=80=99m also assuming that because the producers were using gzip, so ea= ch record is actual a compressed message set, that iwhen the pre-outage lea= der requests records from the leader, the offset it requests could just hap= pened to be in the middle of a compressed batch, but the leader returns the= full batch. When the pre-outage leader appends this batch to its own log = it thinks all is OK. But what has happened is that the offsets in the log a= re no longer monotonically incrementing. Instead they actually dip by the n= umber of records in the compressed batch that were before the requested off= set. If and when this broker restarts this dip may be at the 4K boundary t= he indexer checks. If it is, the broker won=E2=80=99t start. Several of our brokers were unlucky enough to hit that 4K boundary, causing= a protracted outage. We=E2=80=99ve written a little utility that shows se= veral more brokers have a dip outside of the 4K boundary. There are some assumptions in there, which I=E2=80=99ve not got around to c= onfirming / denying. (A quick attempt to recreate this failed and I've not = found the time to invest more). Of course I'd really appreciate the community / experts stepping in and com= menting on whether my assumptions are right or wrong, or if there is anothe= r explanation to the problem.=20 But assuming I=E2=80=99m mostly right, then the fact the broker won=E2=80= =99t start is obviously a bug, and one I=E2=80=99d like to fix. A Kafka br= oker should not corrupt its own log during normal operation to the point th= at it can=E2=80=99t restart! A secondary issue is if we think the divergent logs are acceptable? This ma= y be deemed acceptable given the producers have chosen availability over co= nsistency when they produced with acks =3D 1? Though personally, the syste= m having diverging replicas of an immutable commit log just doesn't sit rig= ht. I see us having a few options here: * Have the replicas detect the divergence of their logs e.g. a follower com= pares the checksum of its last record with the same offset on the leader. T= he follower can then workout that its log has diverged from the leader. At= which point it could either halt, stop replicating that partition or searc= h backwards to find the point of divergence, truncate and recover. (possibl= y saving the truncated part somewhere). This would be a protocol change for= Kafka. This solution trades availability, (you=E2=80=99ve got less ISRs d= uring the extended re-sync process), for consistency. * Leave the logs as they are and have the indexing of offsets in the log on= start up handle such a situation gracefully. This leaves logs in a diverg= ent state between replicas, (meaning replays would yield different messages= if the leader was up to down), but gives better availability, (no time spe= nt not being an ISR while it repairs any divergence). * Support multiple options and allow it be tuned, ideally by topic. * Something else... I=E2=80=99m happy/keen to contribute here. But I=E2=80=99d like to first di= scuss which option should be investigated. Andy > Broker faills to start after ungraceful shutdown due to non-monotonically= incrementing offsets in logs > -------------------------------------------------------------------------= ----------------------------- > > Key: KAFKA-3919 > URL: https://issues.apache.org/jira/browse/KAFKA-3919 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.9.0.1 > Reporter: Andy Coates > > Hi All, > I encountered an issue with Kafka following a power outage that saw a pro= portion of our cluster disappear. When the power came back on several broke= rs halted on start up with the error: > {noformat} > =09Fatal error during KafkaServerStartable startup. Prepare to shutdown= =E2=80=9D > =09kafka.common.InvalidOffsetException: Attempt to append an offset (1239= 742691) to position 35728 no larger than the last offset appended (12397428= 22) to /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.i= ndex. > =09at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.sc= ala:207) > =09at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197= ) > =09at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197= ) > =09at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > =09at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > =09at kafka.log.LogSegment.recover(LogSegment.scala:188) > =09at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > =09at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > =09at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.appl= y(TraversableLike.scala:772) > =09at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimi= zed.scala:33) > =09at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > =09at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike= .scala:771) > =09at kafka.log.Log.loadSegments(Log.scala:160) > =09at kafka.log.Log.(Log.scala:90) > =09at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$= 10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > =09at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > =09at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:= 511) > =09at java.util.concurrent.FutureTask.run(FutureTask.java:266) > =09at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecuto= r.java:1142) > =09at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecut= or.java:617) > =09at java.lang.Thread.run(Thread.java:745) > {noformat} > The only way to recover the brokers was to delete the log files that cont= ained non monotonically incrementing offsets. > I=E2=80=99ve spent some time digging through the logs and I feel I may ha= ve worked out the sequence of events leading to this issue, (though this is= based on some assumptions I've made about the way Kafka is working, which = may be wrong): > Given: > * A topic that is produced to using acks =3D 1 > * A topic that is produced to using gzip compression > * A topic that has min.isr set to less than the number of replicas, (i.e.= min.isr=3D2, #replicas=3D3) > * Following ISRs are lagging behind the leader by some small number of me= ssages, (which is normal with acks=3D1) > * brokers are configured with fairly large zk session timeout e.g. 30s. > * brokers are configured so that unclean leader elections are disabled. > Then: > When something like a power outage take out all three replicas, its possi= ble to get into a state such that the indexes won=E2=80=99t rebuild on a re= start and a broker fails to start. This can happen when: > * Enough brokers, but not the pre-outage leader, come on-line for the par= tition to be writeable > * Producers produce enough records to the partition that the head offset = is now greater than the pre-outage leader head offset. > * The pre-outage leader comes back online. > At this point the logs on the pre-outage leader have diverged from the ot= her replicas. It has some messages that are not in the other replicas, and= the other replicas have some records not in the pre-outage leader's log - = at the same offsets. > I=E2=80=99m assuming that because the current leader has at higher offset= than the pre-outage leader, the pre-outage leader just starts following th= e leader and requesting the records it thinks its missing. > I=E2=80=99m also assuming that because the producers were using gzip, so = each record is actual a compressed message set, that iwhen the pre-outage l= eader requests records from the leader, the offset it requests could just h= appened to be in the middle of a compressed batch, but the leader returns t= he full batch. When the pre-outage leader appends this batch to its own lo= g it thinks all is OK. But what has happened is that the offsets in the log= are no longer monotonically incrementing. Instead they actually dip by the= number of records in the compressed batch that were before the requested o= ffset. If and when this broker restarts this dip may be at the 4K boundary= the indexer checks. If it is, the broker won=E2=80=99t start. > Several of our brokers were unlucky enough to hit that 4K boundary, causi= ng a protracted outage. We=E2=80=99ve written a little utility that shows = several more brokers have a dip outside of the 4K boundary. > There are some assumptions in there, which I=E2=80=99ve not got around to= confirming / denying. (A quick attempt to recreate this failed and I've no= t found the time to invest more). > Of course I'd really appreciate the community / experts stepping in and c= ommenting on whether my assumptions are right or wrong, or if there is anot= her explanation to the problem.=20 > But assuming I=E2=80=99m mostly right, then the fact the broker won=E2=80= =99t start is obviously a bug, and one I=E2=80=99d like to fix. A Kafka br= oker should not corrupt its own log during normal operation to the point th= at it can=E2=80=99t restart! > A secondary issue is if we think the divergent logs are acceptable? This = may be deemed acceptable given the producers have chosen availability over = consistency when they produced with acks =3D 1? Though personally, the sys= tem having diverging replicas of an immutable commit log just doesn't sit r= ight. > I see us having a few options here: > * Have the replicas detect the divergence of their logs e.g. a follower c= ompares the checksum of its last record with the same offset on the leader.= The follower can then workout that its log has diverged from the leader. = At which point it could either halt, stop replicating that partition or sea= rch backwards to find the point of divergence, truncate and recover. (possi= bly saving the truncated part somewhere). This would be a protocol change f= or Kafka. This solution trades availability, (you=E2=80=99ve got less ISRs= during the extended re-sync process), for consistency. > * Leave the logs as they are and have the indexing of offsets in the log = on start up handle such a situation gracefully. This leaves logs in a dive= rgent state between replicas, (meaning replays would yield different messag= es if the leader was up to down), but gives better availability, (no time s= pent not being an ISR while it repairs any divergence). > * Support multiple options and allow it be tuned, ideally by topic. > * Something else... > I=E2=80=99m happy/keen to contribute here. But I=E2=80=99d like to first = discuss which option should be investigated. > Andy -- This message was sent by Atlassian JIRA (v6.3.4#6332)