kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ismael Juma (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-6043) Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) blocked
Date Sat, 28 Oct 2017 15:29:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Ismael Juma resolved KAFKA-6043.
--------------------------------
    Resolution: Auto Closed

The old consumer is deprecated and no further changes are planned. Please upgrade to the Java
consumer whenever possible.

> Kafka 8.1.1 - .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
blocked
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6043
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6043
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.8.1.1
>            Reporter: Ramkumar
>         Attachments: 6043.v1, stdout.LRMIID-158150.zip
>
>
> we are using 3 node kafka cluster. Around this kafka cluster we have a RESTful service
which provides http APIs for client. This service maintains the consumer connection in cache.
And this cache is set to expire in 60 minutes after which , the consumer connection will get
disconnected.
> But we see this zookeeperconnection thread is blocked and the consumer object is still
hanging in jvm. Can you pls let me know if there is any solution identified for this
> Below is the output from thread dump when this occurred
> pool-25100-thread-1" prio=10 tid=0x00007f711804e820 nid=0x6726 waiting for monitor entry
[0x00007f6cd999b000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
>         - waiting to lock <0x000000076a922c40> (a java.lang.Object)
>         at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
>         at com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207)
>         at com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x00007f7180198030 nid=0x55a2 waiting
on condition [0x00007f6c9f4fa000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x000000076b2e1508> (a java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
>         at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>         at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
>         at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
>         at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
>         at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
>         - locked <0x000000076a922340> (a java.lang.Object)
>         at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala
> :524)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402)
>         - locked <0x000000076a922c40> (a java.lang.Object)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message