kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Peter Davis (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5970) Deadlock due to locking of DelayedProduce and group
Date Mon, 25 Sep 2017 15:38:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16179184#comment-16179184
] 

Peter Davis commented on KAFKA-5970:
------------------------------------

[~rsivaram] Thanks for looking into this so quickly.  We agree, looks like the same codepath.

Theorizing about our problem and whether this is a smoking gun, it's possible this deadlock
is an effect, not a cause, of our issue.  As I mentioned we're seeing a broker repeatedly
end up as the only member of ISR for partitions it is leading, which includes some __consumer_offsets
partitions.  If replication to __consumer_offsets failed first, then the GroupCoordinator's
producer would get hung trying to publish committed offsets and we might see this.  On the
other hand, a deadlocked GroupCoordinator which hangs up request processing threads could
have all kinds of cascading effects, right?  Sigh...

We are just spinning here trying to theorize how this is biting us so badly.  If the GroupCoordinator
can deadlock on merely committing offsets, this'd be extremely serious and affect a whole
lot more people, right?

> Deadlock due to locking of DelayedProduce and group
> ---------------------------------------------------
>
>                 Key: KAFKA-5970
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5970
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Critical
>             Fix For: 1.0.0
>
>         Attachments: jstack.txt
>
>
> From a local run of TransactionsBounceTest. Looks like we hold group lock while completing
DelayedProduce, which in turn may acquire group lock.
> {quote}
> Found one Java-level deadlock:
> =============================
> "kafka-request-handler-7":
>   waiting to lock monitor 0x00007fe08891fb08 (object 0x000000074a9fbc50, a kafka.coordinator.group.GroupMetadata),
>   which is held by "kafka-request-handler-4"
> "kafka-request-handler-4":
>   waiting to lock monitor 0x00007fe0869e4408 (object 0x0000000749be7bb8, a kafka.server.DelayedProduce),
>   which is held by "kafka-request-handler-3"
> "kafka-request-handler-3":
>   waiting to lock monitor 0x00007fe08891fb08 (object 0x000000074a9fbc50, a kafka.coordinator.group.GroupMetadata),
>   which is held by "kafka-request-handler-4"
> Java stack information for the threads listed above:
> ===================================================
> "kafka-request-handler-7":
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:752)
>           waiting to lock <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:750)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:750)
>         at kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:439)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1556)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:134)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:116)
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:76)
>           locked <0x000000074b21c968> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:434)
>         at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:285)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1290)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1286)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1286)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:786)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:598)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-4":
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75)
>           waiting to lock <0x0000000749be7bb8> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:434)
>         at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:516)
>         at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:707)
>         at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:691)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:691)
>         at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:441)
>         at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:240)
>         at kafka.coordinator.group.GroupMetadataManager.storeGroup(GroupMetadataManager.scala:228)
>         at kafka.coordinator.group.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:731)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.DelayedJoin.onComplete(DelayedJoin.scala:44)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.coordinator.group.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:42)
>         at kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:708)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:42)
>         at kafka.coordinator.group.DelayedJoin.safeTryComplete(DelayedJoin.scala:40)
>         at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:199)
>         at kafka.coordinator.group.GroupCoordinator.prepareRebalance(GroupCoordinator.scala:693)
>         at kafka.coordinator.group.GroupCoordinator.kafka$coordinator$group$GroupCoordinator$$maybePrepareRebalance(GroupCoordinator.scala:668)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupCoordinator.removeMemberAndUpdateGroup(GroupCoordinator.scala:700)
>         at kafka.coordinator.group.GroupCoordinator.handleLeaveGroup(GroupCoordinator.scala:324)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.server.KafkaApis.handleLeaveGroupRequest(KafkaApis.scala:1259)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:112)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-3":
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:752)
>           waiting to lock <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:750)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:750)
>         at kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:439)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1556)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:134)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:116)
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:76)
>           locked <0x0000000749be7bb8> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1294)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1286)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1286)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:786)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:598)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> Found 1 deadlock.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message