flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: NPE with Flink Streaming from Kafka
Date Wed, 02 Dec 2015 10:09:05 GMT
Hi Mihail,
could you please give some information about the number of keys that you are expecting in
the data and how big the elements are that you are processing in the window.

Also, are there any other operations that could be taxing on Memory. I think the different
exception you see for 500MB mem size is just because Java notices that it ran out of memory
at a different part in the program.

Cheers,
Aljoscha
> On 02 Dec 2015, at 10:57, Vieru, Mihail <mihail.vieru@zalando.de> wrote:
> 
> Yes, with the "start-cluster-streaming.sh" script.
> If the TaskManager gets 5GB of heap it manages to process ~100 million messages and then
throws the above OOM.
> If it gets only 500MB it manages to process ~8 million and a somewhat misleading exception
is thrown:
> 
> 12/01/2015 19:14:07    Source: Custom Source -> Map -> Map(1/1) switched to FAILED

> java.lang.Exception: Java heap space
>     at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: Java heap space
>     at org.json.simple.parser.Yylex.<init>(Yylex.java:231)
>     at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34)
>     at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70)
>     at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300)
>     at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48)
>     at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97)
>     at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92)
>     at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450)
> 
> 
> 
> 
> 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetzger@apache.org>:
> Its good news that the issue has been resolved.
> 
> Regarding the OOM, did you start Flink in the streaming mode?
> 
> On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <mihail.vieru@zalando.de> wrote:
> Thank you, Robert! The issue with Kafka is now solved with the 0.10-SNAPSHOT dependency.
> 
> We have run into an OutOfMemory exception though, which appears to be related to the
state. As my colleague, Javier Lopez, mentioned in a previous thread, state handling is crucial
for our use case. And as the jobs are intended to run for months, stability plays an important
role in choosing a stream processing framework.
> 
> 12/02/2015 10:03:53    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108)
-> Sink: Unnamed(1/1) switched to FAILED 
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.HashMap.resize(HashMap.java:703)
>     at java.util.HashMap.putVal(HashMap.java:662)
>     at java.util.HashMap.put(HashMap.java:611)
>     at org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>     at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
>     at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
>     at org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
>     at org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
>     at org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
>     at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>     at java.lang.Thread.run(Thread.java:745)
> 
> 
> 
> 
> 2015-12-01 17:42 GMT+01:00 Maximilian Michels <mxm@apache.org>:
> Thanks! I've linked the issue in JIRA.
> 
> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetzger@apache.org> wrote:
> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824
> >
> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <mxm@apache.org> wrote:
> >>
> >> I know this has been fixed already but, out of curiosity, could you
> >> point me to the Kafka JIRA issue for this
> >> bug? From the Flink issue it looks like this is a Zookeeper version
> >> mismatch.
> >>
> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetzger@apache.org>
> >> wrote:
> >> > Hi Gyula,
> >> >
> >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
> >> > "release-0.10" branch to Apache's maven snapshot repository.
> >> >
> >> >
> >> > I don't think Mihail's code will run when he's compiling it against
> >> > 1.0-SNAPSHOT.
> >> >
> >> >
> >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.fora@gmail.com>
wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> I think Robert meant to write setting the connector dependency to
> >> >> 1.0-SNAPSHOT.
> >> >>
> >> >> Cheers,
> >> >> Gyula
> >> >>
> >> >> Robert Metzger <rmetzger@apache.org> ezt írta (időpont: 2015.
dec. 1.,
> >> >> K,
> >> >> 17:10):
> >> >>>
> >> >>> Hi Mihail,
> >> >>>
> >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for
this
> >> >>> as
> >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
> >> >>>
> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will
contain
> >> >>> a
> >> >>> fix.
> >> >>>
> >> >>> Since the kafka connector is not contained in the flink binary,
you
> >> >>> can
> >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven
> >> >>> will
> >> >>> then download the code planned for the 0.10-SNAPSHOT release.
> >> >>>
> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
> >> >>> <mihail.vieru@zalando.de>
> >> >>> wrote:
> >> >>>>
> >> >>>> Hi,
> >> >>>>
> >> >>>> we get the following NullPointerException after ~50 minutes
when
> >> >>>> running
> >> >>>> a streaming job with windowing and state that reads data from
Kafka
> >> >>>> and
> >> >>>> writes the result to local FS.
> >> >>>> There are around 170 million messages to be processed, Flink
0.10.1
> >> >>>> stops at ~8 million.
> >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh"
> >> >>>> script.
> >> >>>>
> >> >>>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
Map(1/1)
> >> >>>> switched
> >> >>>> to SCHEDULED
> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
Map(1/1)
> >> >>>> switched
> >> >>>> to DEPLOYING
> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce
at
> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
switched to
> >> >>>> SCHEDULED
> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce
at
> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
switched to
> >> >>>> DEPLOYING
> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
Map(1/1)
> >> >>>> switched
> >> >>>> to RUNNING
> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce
at
> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
switched to
> >> >>>> RUNNING
> >> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce
at
> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
switched to
> >> >>>> CANCELED
> >> >>>> 12/01/2015 15:56:08    Source: Custom Source -> Map ->
Map(1/1)
> >> >>>> switched
> >> >>>> to FAILED
> >> >>>> java.lang.Exception
> >> >>>>     at
> >> >>>>
> >> >>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> >> >>>>     at
> >> >>>>
> >> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
> >> >>>>     at
> >> >>>>
> >> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> >> >>>>     at
> >> >>>>
> >> >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
> >> >>>>     at
> >> >>>>
> >> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> >> >>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> >> >>>>     at java.lang.Thread.run(Thread.java:745)
> >> >>>> Caused by: java.lang.NullPointerException
> >> >>>>     at
> >> >>>>
> >> >>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> >> >>>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> >> >>>>     at
> >> >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >> >>>>     at
> >> >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> >> >>>>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> >> >>>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> >> >>>>     at
> >> >>>>
> >> >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> >> >>>>     at
> >> >>>>
> >> >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> >> >>>>     at
> >> >>>>
> >> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
> >> >>>>
> >> >>>>
> >> >>>> Any ideas on what could cause this behaviour?
> >> >>>>
> >> >>>> Best,
> >> >>>> Mihail
> >> >>>
> >> >>>
> >> >
> >
> >
> 
> 
> 


Mime
View raw message