kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Scott Reynolds (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5970) Deadlock due to locking of DelayedProduce and group
Date Wed, 27 Sep 2017 15:59:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182779#comment-16182779
] 

Scott Reynolds commented on KAFKA-5970:
---------------------------------------

Ran into a very similar deadlock

{code}
Found one Java-level deadlock:
=============================
"executor-Heartbeat":
  waiting to lock monitor 0x00007f8c9c297a98 (object 0x000000042f7c55a0, a kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-5"
"kafka-request-handler-5":
  waiting to lock monitor 0x00007f8d10125ba8 (object 0x000000042f7b64e8, a kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-1"
"kafka-request-handler-1":
  waiting to lock monitor 0x00007f8c9c297a98 (object 0x000000042f7c55a0, a kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-5"

Java stack information for the threads listed above:
===================================================
"executor-Heartbeat":
	at kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:777)
	- waiting to lock <0x000000042f7c55a0> (a kafka.coordinator.group.GroupMetadata)
	at kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:38)
	at kafka.server.DelayedOperation.run(DelayedOperation.scala:113)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
"kafka-request-handler-5":
	at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75)
	- waiting to lock <0x000000042f7b64e8> (a kafka.coordinator.group.GroupMetadata)
	at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
	at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
	at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250)
	at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418)
	at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500)
	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:546)
	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:532)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:532)
	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:373)
	at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:239)
	at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:381)
	at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465)
	- locked <0x000000042f7c55a0> (a kafka.coordinator.group.GroupMetadata)
	at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:428)
	at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:356)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:105)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
	at java.lang.Thread.run(Thread.java:748)
"kafka-request-handler-1":
	at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75)
	- waiting to lock <0x000000042f7c55a0> (a kafka.coordinator.group.GroupMetadata)
	at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
	at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
	at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250)
	at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418)
	at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500)
	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:546)
	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:532)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:532)
	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:373)
	at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:239)
	at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:381)
	at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465)
	- locked <0x000000042f7b64e8> (a kafka.coordinator.group.GroupMetadata)
	at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:428)
	at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:356)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:105)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
	at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.
{code}

I believe the change that introduced this regression is this:
https://github.com/apache/kafka/pull/3133


bq. With Guozhang's change in ReplicaManager, it seems we no longer need the delayed store
that we were using before to avoid holding the group lock when calling ReplicaManager.append.

Which I think addresses [~rsivaram] question

bq. I am struggling to understand why a deadlock (without transactions) was not observed before
- I don't mean in tests where it is rather unlikely, but by users.

> Deadlock due to locking of DelayedProduce and group
> ---------------------------------------------------
>
>                 Key: KAFKA-5970
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5970
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Critical
>             Fix For: 1.0.0
>
>         Attachments: jstack.txt
>
>
> From a local run of TransactionsBounceTest. Looks like we hold group lock while completing
DelayedProduce, which in turn may acquire group lock.
> {quote}
> Found one Java-level deadlock:
> =============================
> "kafka-request-handler-7":
>   waiting to lock monitor 0x00007fe08891fb08 (object 0x000000074a9fbc50, a kafka.coordinator.group.GroupMetadata),
>   which is held by "kafka-request-handler-4"
> "kafka-request-handler-4":
>   waiting to lock monitor 0x00007fe0869e4408 (object 0x0000000749be7bb8, a kafka.server.DelayedProduce),
>   which is held by "kafka-request-handler-3"
> "kafka-request-handler-3":
>   waiting to lock monitor 0x00007fe08891fb08 (object 0x000000074a9fbc50, a kafka.coordinator.group.GroupMetadata),
>   which is held by "kafka-request-handler-4"
> Java stack information for the threads listed above:
> ===================================================
> "kafka-request-handler-7":
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:752)
>           waiting to lock <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:750)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:750)
>         at kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:439)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1556)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:134)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:116)
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:76)
>           locked <0x000000074b21c968> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:434)
>         at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:285)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1290)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1286)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1286)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:786)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:598)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-4":
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75)
>           waiting to lock <0x0000000749be7bb8> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:434)
>         at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:516)
>         at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:707)
>         at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:691)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:691)
>         at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:441)
>         at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:240)
>         at kafka.coordinator.group.GroupMetadataManager.storeGroup(GroupMetadataManager.scala:228)
>         at kafka.coordinator.group.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:731)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.DelayedJoin.onComplete(DelayedJoin.scala:44)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.coordinator.group.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:42)
>         at kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:708)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:42)
>         at kafka.coordinator.group.DelayedJoin.safeTryComplete(DelayedJoin.scala:40)
>         at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:199)
>         at kafka.coordinator.group.GroupCoordinator.prepareRebalance(GroupCoordinator.scala:693)
>         at kafka.coordinator.group.GroupCoordinator.kafka$coordinator$group$GroupCoordinator$$maybePrepareRebalance(GroupCoordinator.scala:668)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupCoordinator.removeMemberAndUpdateGroup(GroupCoordinator.scala:700)
>         at kafka.coordinator.group.GroupCoordinator.handleLeaveGroup(GroupCoordinator.scala:324)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.server.KafkaApis.handleLeaveGroupRequest(KafkaApis.scala:1259)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:112)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-3":
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:752)
>           waiting to lock <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:750)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:750)
>         at kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:439)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1556)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:134)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:116)
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:76)
>           locked <0x0000000749be7bb8> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1294)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1286)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1286)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:786)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:598)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> Found 1 deadlock.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message