flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vieru, Mihail" <mihail.vi...@zalando.de>
Subject Re: NPE with Flink Streaming from Kafka
Date Wed, 02 Dec 2015 09:57:26 GMT
Yes, with the "start-cluster-streaming.sh" script.
If the TaskManager gets 5GB of heap it manages to process ~100 million
messages and then throws the above OOM.
If it gets only 500MB it manages to process ~8 million and a somewhat
misleading exception is thrown:

12/01/2015 19:14:07    Source: Custom Source -> Map -> Map(1/1) switched to
FAILED
java.lang.Exception: Java heap space
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at org.json.simple.parser.Yylex.<init>(Yylex.java:231)
    at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34)
    at
de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70)
    at
de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65)
    at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
    at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316)
    at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300)
    at
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48)
    at
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97)
    at
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92)
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450)




2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetzger@apache.org>:

> Its good news that the issue has been resolved.
>
> Regarding the OOM, did you start Flink in the streaming mode?
>
> On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <mihail.vieru@zalando.de>
> wrote:
>
>> Thank you, Robert! The issue with Kafka is now solved with the
>> 0.10-SNAPSHOT dependency.
>>
>> We have run into an OutOfMemory exception though, which appears to be
>> related to the state. As my colleague, Javier Lopez, mentioned in a
>> previous thread, state handling is crucial for our use case. And as the
>> jobs are intended to run for months, stability plays an important role in
>> choosing a stream processing framework.
>>
>> 12/02/2015 10:03:53    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> FAILED
>> java.lang.OutOfMemoryError: Java heap space
>>     at java.util.HashMap.resize(HashMap.java:703)
>>     at java.util.HashMap.putVal(HashMap.java:662)
>>     at java.util.HashMap.put(HashMap.java:611)
>>     at
>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>>     at
>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
>>     at
>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
>>     at
>> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
>>     at
>> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
>>     at
>> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
>>     at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>>     at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> 2015-12-01 17:42 GMT+01:00 Maximilian Michels <mxm@apache.org>:
>>
>>> Thanks! I've linked the issue in JIRA.
>>>
>>> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetzger@apache.org>
>>> wrote:
>>> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824
>>> >
>>> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <mxm@apache.org>
>>> wrote:
>>> >>
>>> >> I know this has been fixed already but, out of curiosity, could you
>>> >> point me to the Kafka JIRA issue for this
>>> >> bug? From the Flink issue it looks like this is a Zookeeper version
>>> >> mismatch.
>>> >>
>>> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetzger@apache.org>
>>> >> wrote:
>>> >> > Hi Gyula,
>>> >> >
>>> >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
>>> >> > "release-0.10" branch to Apache's maven snapshot repository.
>>> >> >
>>> >> >
>>> >> > I don't think Mihail's code will run when he's compiling it against
>>> >> > 1.0-SNAPSHOT.
>>> >> >
>>> >> >
>>> >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.fora@gmail.com>
>>> wrote:
>>> >> >>
>>> >> >> Hi,
>>> >> >>
>>> >> >> I think Robert meant to write setting the connector dependency
to
>>> >> >> 1.0-SNAPSHOT.
>>> >> >>
>>> >> >> Cheers,
>>> >> >> Gyula
>>> >> >>
>>> >> >> Robert Metzger <rmetzger@apache.org> ezt írta (időpont:
2015.
>>> dec. 1.,
>>> >> >> K,
>>> >> >> 17:10):
>>> >> >>>
>>> >> >>> Hi Mihail,
>>> >> >>>
>>> >> >>> the issue is actually a bug in Kafka. We have a JIRA in
Flink for
>>> this
>>> >> >>> as
>>> >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>>> >> >>>
>>> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2
will
>>> contain
>>> >> >>> a
>>> >> >>> fix.
>>> >> >>>
>>> >> >>> Since the kafka connector is not contained in the flink
binary,
>>> you
>>> >> >>> can
>>> >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT.
>>> Maven
>>> >> >>> will
>>> >> >>> then download the code planned for the 0.10-SNAPSHOT release.
>>> >> >>>
>>> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>>> >> >>> <mihail.vieru@zalando.de>
>>> >> >>> wrote:
>>> >> >>>>
>>> >> >>>> Hi,
>>> >> >>>>
>>> >> >>>> we get the following NullPointerException after ~50
minutes when
>>> >> >>>> running
>>> >> >>>> a streaming job with windowing and state that reads
data from
>>> Kafka
>>> >> >>>> and
>>> >> >>>> writes the result to local FS.
>>> >> >>>> There are around 170 million messages to be processed,
Flink
>>> 0.10.1
>>> >> >>>> stops at ~8 million.
>>> >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh"
>>> >> >>>> script.
>>> >> >>>>
>>> >> >>>> 12/01/2015 15:06:24    Job execution switched to status
RUNNING.
>>> >> >>>> 12/01/2015 15:06:24    Source: Custom Source ->
Map -> Map(1/1)
>>> >> >>>> switched
>>> >> >>>> to SCHEDULED
>>> >> >>>> 12/01/2015 15:06:24    Source: Custom Source ->
Map -> Map(1/1)
>>> >> >>>> switched
>>> >> >>>> to DEPLOYING
>>> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000)
of Reduce
>>> at
>>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>> switched to
>>> >> >>>> SCHEDULED
>>> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000)
of Reduce
>>> at
>>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>> switched to
>>> >> >>>> DEPLOYING
>>> >> >>>> 12/01/2015 15:06:24    Source: Custom Source ->
Map -> Map(1/1)
>>> >> >>>> switched
>>> >> >>>> to RUNNING
>>> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000)
of Reduce
>>> at
>>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>> switched to
>>> >> >>>> RUNNING
>>> >> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000)
of Reduce
>>> at
>>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>> switched to
>>> >> >>>> CANCELED
>>> >> >>>> 12/01/2015 15:56:08    Source: Custom Source ->
Map -> Map(1/1)
>>> >> >>>> switched
>>> >> >>>> to FAILED
>>> >> >>>> java.lang.Exception
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>> >> >>>>     at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> >> >>>>     at java.lang.Thread.run(Thread.java:745)
>>> >> >>>> Caused by: java.lang.NullPointerException
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>> >> >>>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>> >> >>>>     at
>>> >> >>>>
>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>> >> >>>>     at
>>> >> >>>>
>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>> >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>> >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>> >> >>>>     at
>>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>> >> >>>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>> >> >>>>     at
>>> >> >>>>
>>> >> >>>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>> >> >>>>
>>> >> >>>>
>>> >> >>>> Any ideas on what could cause this behaviour?
>>> >> >>>>
>>> >> >>>> Best,
>>> >> >>>> Mihail
>>> >> >>>
>>> >> >>>
>>> >> >
>>> >
>>> >
>>>
>>
>>
>

Mime
View raw message