kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"
Date Thu, 22 Mar 2018 07:54:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409185#comment-16409185
] 

ASF GitHub Bot commented on KAFKA-6683:
---------------------------------------

hachikuji opened a new pull request #4755: KAFKA-6683; Ensure producer state not mutated prior
to append
URL: https://github.com/apache/kafka/pull/4755
 
 
   We were unintentionally mutating the cached queue of batches prior to appending to the
log. This could have several bad consequences if the append ultimately failed. In the reporter's
case, it caused the snapshot to be invalid after a segment roll. The snapshot contained producer
state at offsets higher than the snapshot offset. If we ever had to load from that snapshot,
the state was left inconsistent, which led to an error that ultimately crashed the replica
fetcher.
   
   The fix required some refactoring to avoid sharing the same underlying queue inside `ProducerAppendInfo`.
I have added test cases which reproduce the invalid snapshot state. I have also made an effort
to clean up logging since it was not easy to track this problem down.
   
   One final note: I have removed the duplicate check inside `ProducerStateManager` since
it was both redundant and incorrect. The redundancy was in the checking of the cached batches:
we already check these in `Log.analyzeAndValidateProducerState`. The incorrectness was the
handling of sequence number overflow: we were only handling one very specific case of overflow,
but others would have resulted in an invalid assertion. Instead, we now throw `OutOfOrderSequenceException`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"

> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6683
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6683
>             Project: Kafka
>          Issue Type: Bug
>          Components: replication
>    Affects Versions: 1.0.0
>         Environment: os: GNU/Linux 
> arch: x86_64
> Kernel: 4.9.77
> jvm: OpenJDK 1.8.0
>            Reporter: Chema Sanchez
>            Assignee: Jason Gustafson
>            Priority: Critical
>         Attachments: server.properties
>
>
> We have been experiencing this issue lately when restarting or replacing brokers of our
Kafka clusters during maintenance operations.
> Having restarted or replaced a broker, after some minutes performing normally it may
suddenly throw the following exception and stop replicating some partitions:
> {code:none}
> 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, fetcherId=0]
Error due to (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempted to complete a transaction which was not
started
>         at kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
>         at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
>         at kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
>         at scala.collection.immutable.List.foreach(List.scala:389)
>         at scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
>         at scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
>         at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
>         at kafka.log.Log.loadProducersFromLog(Log.scala:540)
>         at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
>         at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
>         at scala.collection.Iterator.foreach(Iterator.scala:929)
>         at scala.collection.Iterator.foreach$(Iterator.scala:929)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.log.Log.loadProducerState(Log.scala:514)
>         at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
>         at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.truncateTo(Log.scala:1467)
>         at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
>         at kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
>         at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
>         at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
>         at kafka.log.LogManager.truncateTo(LogManager.scala:445)
>         at kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
>         at scala.collection.Iterator.foreach(Iterator.scala:929)
>         at scala.collection.Iterator.foreach$(Iterator.scala:929)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
>         at kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
>         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
>         at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
>         at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, fetcherId=0]
Stopped (kafka.server.ReplicaFetcherThread)
> {code}
> As during system updates all brokers in a cluster are restarted, it happened some times
the issue to manifest in different brokers holding replicas for the same partition at the
same time, which caused downtime due not enough ISR replica.
> It is necessary to restart the faulted broker in order to recover partition replication,
but after hitting this issue we often face that after restarting the broker it shuts itself
down with the following error among lots of warnings due corrupted indices:
> {code:none}
> [2018-03-05 16:02:22,450] ERROR There was an error in one of the threads during logs
loading: org.apache.kafka.common.errors.ProducerFencedException: Invalid producer epoch: 20
(zombie): 21 (current) (kafka.log.LogManager)
> [2018-03-05 16:02:22,453] FATAL [KafkaServer id=10] Fatal error during KafkaServer startup.
Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.errors.ProducerFencedException: Invalid producer epoch: 20 (zombie):
21 (current)
> {code}
> When this happened the only way to keep Kafka up has been to delete all the data inside
the log directory (/var/lib/kafka in our case).
> The problem manifest randomly but we managed to reproduce the ReplicaFetcher crashing
(although not the failed startup) out of our production cluster by doing this:
>   1 - Setup a Kafka cluster running 3 brokers (see attached configuration): 10, 11 and
12
>   2 - Create a topic with the following settings: Topic:mytopic2, PartitionCount:12,
ReplicationFactor:3, Configs:segment.bytes=52428800,retention.ms=1800000
>   3 - Run some producers like this:
> {code:java}
> while true
> do
>  ./kafka-producer-perf-test.sh --topic mytopic2 --record-size=2048 --producer-props
bootstrap.servers=ec2-XXX-XXX-XXX-XXX.eu-west-1.compute.amazonaws.com:9092 enable.idempotence=true
--throughput 50 --num-records 6000 --transactional-id pruebatrans4 --transaction-duration-ms
100
> done
> {code}
>  4 - Run some consumer on mytopic2.
>  5 - Wait for some time for semegments to be rotated.
>  6 - Stop broker 11, remove everything inside /var/lib/kafka, start it again.
>  7 - Wait for data to be replicated and all replicas be in ISR.
>  8 - Stop broker 12, remove everything inside /var/lib/kafka, start it again.
>  9 - Wait for data to be replicated and all replicas be in ISR.
>  10 - Wait for the issue to manifest. If it manifests, after some minutes of normal
behaviour, broker 11 may suddenly stop replicating and some partitions may appear underreplicated.
> If replication after restarting node 12 takes long enough, node 11 may crash its ReplicaFetcher
before replicas in 12 are available causing partitions to go offline. Whe have manage to reproduce
the issue without deleting log data in steps 6 and 8 but it seems more likely to manifest
if we do it. The broker experiencing the issue is quite random, but most of the time seems
to be one of the already restarted brokers but not necessary the latest one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message