nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <chris.mcderm...@hpe.com>
Subject Re: GetKafka blowing up with assertion error in Kafka client code
Date Thu, 28 Apr 2016 16:41:26 GMT
Oleg,

I have reproduced the problem.  Its pretty easy to do. Just delete and recreate the topic
while the processor is running.  I think I saw a similar problem when I increased the partitions
in the topic.  That problem resolved itself when I restarted the GetKafka processors.  However,
to resolve this problem restarting the processor does not work. It must be that something
is being stored in Zookeeper.  I am guessing that deleting and recreating the processor will
do the trick.  Is there any debugging information which I can provide to you?

Thanks,
Chris



On 4/14/16, 8:32 PM, "Oleg Zhurakousky" <ozhurakousky@hortonworks.com> wrote:

>Thanks Chris
>
>Indeed let us know if/when/how to reproduce it so we can evaluate and see if it is something
we can validate/handle in NiFi before it is passed to Kafka (e.g., validation etc)
>
>Cheers
>Oleg
>
>> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
<chris.mcdermott@hpe.com> wrote:
>> 
>> I looked at the Kafka client code and it seemed to me to be a bug in the caller.
There is a map passed that maps topics to number of consumers. In this case it asserting that
the number of consumers is greater than zero. If I can repro the problem I'll try to isolate
it in the debugger and provide more details.
>> 
>> 
>> 
>> Sent from my Verizon, Samsung Galaxy smartphone
>> 
>> 
>> -------- Original message --------
>> From: Oleg Zhurakousky <ozhurakousky@hortonworks.com>
>> Date: 4/14/16 4:14 PM (GMT-05:00)
>> To: dev@nifi.apache.org
>> Subject: Re: GetKafka blowing up with assertion error in Kafka client code
>> 
>> Chris
>> That is correct and for a change I am pretty happy to see this stack trace as it
clearly shows the problem and validates the approach we have.
>> So here are more details. . .
>> 
>> The root failure is in Kafka (as you can see from the stack trace). All we are doing
is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when
Kafka deadlocks (which we noticed happens rather often)
>> When we execute Future.get() it results in ExecutionException which caries the original
Kafka exception (AssertionError).
>> Now I am not sure what that assertion error really means in the context of what you
are trying to do but its clearly a problem originated in Kafka.
>> Could you share your config or whatever other details?
>> 
>> Cheers
>> Oleg
>> 
>>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
<chris.mcdermott@hpe.com> wrote:
>>> 
>>> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.
 Batch size 1, 1 concurrent task.
>>> 
>>> 
>>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
>>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError:
assertion failed
>>>       at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355)
~[na:na]
>>>       at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_45]
>>>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_45]
>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_45]
>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_45]
>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_45]
>>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>>> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError:
assertion failed
>>>       at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
>>>       at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
>>>       at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348)
~[na:na]
>>>       ... 12 common frames omitted
>>> Caused by: java.lang.AssertionError: assertion failed
>>>       at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>>>       at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51)
~[na:na]
>>>       at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49)
~[na:na]
>>>       at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
~[na:na]
>>>       at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>>>       at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
~[na:na]
>>>       at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49)
~[na:na]
>>>       at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113)
~[na:na]
>>>       at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226)
~[na:na]
>>>       at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
~[na:na]
>>>       at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
~[na:na]
>>>       at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281)
~[na:na]
>>>       at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343)
~[na:na]
>>>       at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340)
~[na:na]
>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
>>>       ... 3 common frames omitted
>> 
>> 
>
Mime
View raw message