kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ben Corlett (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5970) Deadlock due to locking of DelayedProduce and group
Date Fri, 20 Oct 2017 08:57:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212383#comment-16212383
] 

Ben Corlett commented on KAFKA-5970:
------------------------------------

Unfortunately we've had another incident today on broker 125 after applying the changes from
pull request 3956. You can see the changes here https://github.com/corlettb/kafka/commits/deadlock.


{noformat}
Found one Java-level deadlock:
=============================
"executor-Heartbeat":
  waiting to lock monitor 0x00007fbd8c1834c8 (object 0x000000068cccb590, a kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-7"
"kafka-request-handler-7":
  waiting to lock monitor 0x00007fbe1942f698 (object 0x000000068cd2c420, a kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-4"
"kafka-request-handler-4":
  waiting to lock monitor 0x00007fbd8c1834c8 (object 0x000000068cccb590, a kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-7"

Java stack information for the threads listed above:
===================================================
"executor-Heartbeat":
	at kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:776)
	- waiting to lock <0x000000068cccb590> (a kafka.coordinator.group.GroupMetadata)
	at kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34)
	at kafka.server.DelayedOperation.run(DelayedOperation.scala:120)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
"kafka-request-handler-7":
	at kafka.coordinator.group.GroupMetadataManager.putCacheCallback$2(GroupMetadataManager.scala:311)
	- waiting to lock <0x000000068cd2c420> (a kafka.coordinator.group.GroupMetadata)
	at kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10(GroupMetadataManager.scala:380)
	at kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10$adapted(GroupMetadataManager.scala:380)
	at kafka.coordinator.group.GroupMetadataManager$$Lambda$1045/747223912.apply(Unknown Source)
	at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:124)
	at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:68)
	at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:106)
	at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:107)
	at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:347)
	at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:253)
	at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250)
	at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418)
	at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500)
	at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:545)
	at kafka.server.ReplicaManager$$Lambda$909/475609331.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown Source)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
	at scala.collection.TraversableLike.map(TraversableLike.scala:234)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:531)
	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:373)
	at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:245)
	at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:380)
	at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465)
	- locked <0x000000068cccb590> (a kafka.coordinator.group.GroupMetadata)
	at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429)
	at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:105)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
	at java.lang.Thread.run(Thread.java:748)
"kafka-request-handler-4":
	at kafka.coordinator.group.GroupMetadataManager.putCacheCallback$2(GroupMetadataManager.scala:311)
	- waiting to lock <0x000000068cccb590> (a kafka.coordinator.group.GroupMetadata)
	at kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10(GroupMetadataManager.scala:380)
	at kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10$adapted(GroupMetadataManager.scala:380)
	at kafka.coordinator.group.GroupMetadataManager$$Lambda$1045/747223912.apply(Unknown Source)
	at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:124)
	at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:68)
	at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:106)
	at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:107)
	at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:347)
	at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:253)
	at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250)
	at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418)
	at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500)
	at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:545)
	at kafka.server.ReplicaManager$$Lambda$909/475609331.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown Source)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
	at scala.collection.TraversableLike.map(TraversableLike.scala:234)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:531)
	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:373)
	at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:245)
	at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:380)
	at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465)
	- locked <0x000000068cd2c420> (a kafka.coordinator.group.GroupMetadata)
	at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429)
	at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:105)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
	at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.
{noformat}

> Deadlock due to locking of DelayedProduce and group
> ---------------------------------------------------
>
>                 Key: KAFKA-5970
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5970
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 1.0.0, 0.11.0.2
>
>         Attachments: jstack.txt
>
>
> From a local run of TransactionsBounceTest. Looks like we hold group lock while completing
DelayedProduce, which in turn may acquire group lock.
> {quote}
> Found one Java-level deadlock:
> =============================
> "kafka-request-handler-7":
>   waiting to lock monitor 0x00007fe08891fb08 (object 0x000000074a9fbc50, a kafka.coordinator.group.GroupMetadata),
>   which is held by "kafka-request-handler-4"
> "kafka-request-handler-4":
>   waiting to lock monitor 0x00007fe0869e4408 (object 0x0000000749be7bb8, a kafka.server.DelayedProduce),
>   which is held by "kafka-request-handler-3"
> "kafka-request-handler-3":
>   waiting to lock monitor 0x00007fe08891fb08 (object 0x000000074a9fbc50, a kafka.coordinator.group.GroupMetadata),
>   which is held by "kafka-request-handler-4"
> Java stack information for the threads listed above:
> ===================================================
> "kafka-request-handler-7":
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:752)
>           waiting to lock <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:750)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:750)
>         at kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:439)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1556)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:134)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:116)
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:76)
>           locked <0x000000074b21c968> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:434)
>         at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:285)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1290)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1286)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1286)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:786)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:598)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-4":
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75)
>           waiting to lock <0x0000000749be7bb8> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:434)
>         at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:516)
>         at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:707)
>         at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:691)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:691)
>         at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:441)
>         at kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:240)
>         at kafka.coordinator.group.GroupMetadataManager.storeGroup(GroupMetadataManager.scala:228)
>         at kafka.coordinator.group.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:731)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.DelayedJoin.onComplete(DelayedJoin.scala:44)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.coordinator.group.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:42)
>         at kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:708)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:42)
>         at kafka.coordinator.group.DelayedJoin.safeTryComplete(DelayedJoin.scala:40)
>         at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:199)
>         at kafka.coordinator.group.GroupCoordinator.prepareRebalance(GroupCoordinator.scala:693)
>         at kafka.coordinator.group.GroupCoordinator.kafka$coordinator$group$GroupCoordinator$$maybePrepareRebalance(GroupCoordinator.scala:668)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupCoordinator.removeMemberAndUpdateGroup(GroupCoordinator.scala:700)
>         at kafka.coordinator.group.GroupCoordinator.handleLeaveGroup(GroupCoordinator.scala:324)
>           locked <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.server.KafkaApis.handleLeaveGroupRequest(KafkaApis.scala:1259)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:112)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-3":
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:752)
>           waiting to lock <0x000000074a9fbc50> (a kafka.coordinator.group.GroupMetadata)
>         at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:750)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>         at kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:750)
>         at kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:439)
>         at kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1556)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$20.apply(KafkaApis.scala:1614)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:134)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:116)
>         at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:76)
>           locked <0x0000000749be7bb8> (a kafka.server.DelayedProduce)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:284)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1294)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1286)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1286)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:786)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:598)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>         at java.lang.Thread.run(Thread.java:748)
> Found 1 deadlock.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message