nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Oleg Zhurakousky (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-1684) When the Kafka broker and Zookeeper instance used are replaced behavior is poor
Date Thu, 24 Mar 2016 18:35:25 GMT

    [ https://issues.apache.org/jira/browse/NIFI-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210733#comment-15210733
] 

Oleg Zhurakousky commented on NIFI-1684:
----------------------------------------

One other issue
{code}
java.lang.NullPointerException: null
at org.apache.nifi.processors.kafka.PutKafka.buildMessageContext(PutKafka.java:411) ~[na:na]
at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:293) ~[na:na]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.5.1.1.1.2.1-37.jar:0.5.1.1.1.2.1-37]
. . . 
{code}

> When the Kafka broker and Zookeeper instance used are replaced behavior is poor
> -------------------------------------------------------------------------------
>
>                 Key: NIFI-1684
>                 URL: https://issues.apache.org/jira/browse/NIFI-1684
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 0.6.0
>            Reporter: Joseph Witt
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> The Kafka broker and zookeeper instance being used were replaced while NiFi was running.
> During that time PutKafka had NullPointerExceptions such as:
> {quote}
> 2016-03-24 05:12:58,427 WARN [Timer-Driven Process Thread-5] o.a.n.c.t.ContinuallyRunProcessorTask
Administratively Yielding PutKafka[id=f8b2f669-fec5-3b26-ad2b-bca
> dff0c6543] due to uncaught Exception: java.lang.NullPointerException
> 2016-03-24 05:12:58,429 WARN [Timer-Driven Process Thread-5] o.a.n.c.t.ContinuallyRunProcessorTask
> java.lang.NullPointerException: null
>         at java.lang.String.<init>(String.java:566) ~[na:1.8.0_65]
>         at org.apache.nifi.processors.kafka.SplittableMessageContext.getKeyBytesAsString(SplittableMessageContext.java:105)
~[na:na]
>         at org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:395)
~[na:na]
>         at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308) ~[na:na]
>         at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-0.6.0.jar:0.6.0]
>         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
~[nifi-framework-core-0.6.0.jar:0.6.0]
>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-0.6.0.jar:0.6.0]
>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-0.6.0.jar:0.6.0]
>         at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-0.6.0.jar:0.6.0]
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_65]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_65]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_65]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_65]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_65]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
> {quote}
> But, GetKafka had no errors, stopped functioning, and became unresponsive to attempts
to stop it.  The 30 sec invoke quietly mechanism didn't seem to address the issue either presumably
because the thread is stuck on some object monitor.  I tried to stop it so I could restart
it but in attempting to start it NiFi blocked me saying it was in STOPPING state.  So stack
dump taken and this appears relevant:
> {quote}
> "StandardProcessScheduler Thread-8" Id=142 BLOCKED  on java.lang.Object@17820e22
>         at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:295)
>         at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
>         at org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:296)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
>         at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
>         at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
>         at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
>         at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1330)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.ThreadPoolExecutor$Worker@1f4d2630
> {quote}
> Then here i see the reference to that object in another stack:
> {quote}
> "kafka-consumer-scheduler-0" Id=213 TIMED_WAITING  on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@5e93adf
>         at sun.misc.Unsafe.park(Native Method)
>         at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
>         at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
>         at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>         at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>         at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>         at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:326)
>         at kafka.consumer.ZookeeperConsumerConnector.commitOffsetToZooKeeper(ZookeeperConsumerConnector.scala:283)
>         at kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:304)
>         at kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:303)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:303)
>         - waiting on java.lang.Object@17820e22
>         at kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:271)
>         at kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:134)
>         at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
>         at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.ThreadPoolExecutor$Worker@4a713b18
> {quote}
> Which is waiting on 
> {quote}
> "9e31cdb4-9685-4b04-9164-1d737edd5f31_52.90.60.74-1458794505915-6302e1bb-leader-finder-thread"
Id=233 TIMED_WAITING  on java.util.concurrent.locks.AbstractQueuedSyn
> chronizer$ConditionObject@5e93adf
>         at sun.misc.Unsafe.park(Native Method)
>         at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
>         at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
>         at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
>         at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
>         at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:469)
>         at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:81)
>         at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.locks.ReentrantLock$NonfairSync@50bc97c7
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message