flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: NPE with Flink Streaming from Kafka
Date Wed, 02 Dec 2015 09:45:59 GMT
Its good news that the issue has been resolved.

Regarding the OOM, did you start Flink in the streaming mode?

On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <mihail.vieru@zalando.de>
wrote:

> Thank you, Robert! The issue with Kafka is now solved with the
> 0.10-SNAPSHOT dependency.
>
> We have run into an OutOfMemory exception though, which appears to be
> related to the state. As my colleague, Javier Lopez, mentioned in a
> previous thread, state handling is crucial for our use case. And as the
> jobs are intended to run for months, stability plays an important role in
> choosing a stream processing framework.
>
> 12/02/2015 10:03:53    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> FAILED
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.HashMap.resize(HashMap.java:703)
>     at java.util.HashMap.putVal(HashMap.java:662)
>     at java.util.HashMap.put(HashMap.java:611)
>     at
> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>     at
> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
>     at
> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
>     at
> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
>     at
> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
>     at
> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
>     at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>     at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>     at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 2015-12-01 17:42 GMT+01:00 Maximilian Michels <mxm@apache.org>:
>
>> Thanks! I've linked the issue in JIRA.
>>
>> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetzger@apache.org>
>> wrote:
>> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824
>> >
>> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <mxm@apache.org>
>> wrote:
>> >>
>> >> I know this has been fixed already but, out of curiosity, could you
>> >> point me to the Kafka JIRA issue for this
>> >> bug? From the Flink issue it looks like this is a Zookeeper version
>> >> mismatch.
>> >>
>> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetzger@apache.org>
>> >> wrote:
>> >> > Hi Gyula,
>> >> >
>> >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
>> >> > "release-0.10" branch to Apache's maven snapshot repository.
>> >> >
>> >> >
>> >> > I don't think Mihail's code will run when he's compiling it against
>> >> > 1.0-SNAPSHOT.
>> >> >
>> >> >
>> >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.fora@gmail.com>
>> wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I think Robert meant to write setting the connector dependency
to
>> >> >> 1.0-SNAPSHOT.
>> >> >>
>> >> >> Cheers,
>> >> >> Gyula
>> >> >>
>> >> >> Robert Metzger <rmetzger@apache.org> ezt írta (időpont:
2015. dec.
>> 1.,
>> >> >> K,
>> >> >> 17:10):
>> >> >>>
>> >> >>> Hi Mihail,
>> >> >>>
>> >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink
for
>> this
>> >> >>> as
>> >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>> >> >>>
>> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will
>> contain
>> >> >>> a
>> >> >>> fix.
>> >> >>>
>> >> >>> Since the kafka connector is not contained in the flink binary,
you
>> >> >>> can
>> >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT.
Maven
>> >> >>> will
>> >> >>> then download the code planned for the 0.10-SNAPSHOT release.
>> >> >>>
>> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>> >> >>> <mihail.vieru@zalando.de>
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> Hi,
>> >> >>>>
>> >> >>>> we get the following NullPointerException after ~50 minutes
when
>> >> >>>> running
>> >> >>>> a streaming job with windowing and state that reads data
from
>> Kafka
>> >> >>>> and
>> >> >>>> writes the result to local FS.
>> >> >>>> There are around 170 million messages to be processed,
Flink
>> 0.10.1
>> >> >>>> stops at ~8 million.
>> >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh"
>> >> >>>> script.
>> >> >>>>
>> >> >>>> 12/01/2015 15:06:24    Job execution switched to status
RUNNING.
>> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map
-> Map(1/1)
>> >> >>>> switched
>> >> >>>> to SCHEDULED
>> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map
-> Map(1/1)
>> >> >>>> switched
>> >> >>>> to DEPLOYING
>> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
Reduce at
>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>> switched to
>> >> >>>> SCHEDULED
>> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
Reduce at
>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>> switched to
>> >> >>>> DEPLOYING
>> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map
-> Map(1/1)
>> >> >>>> switched
>> >> >>>> to RUNNING
>> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
Reduce at
>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>> switched to
>> >> >>>> RUNNING
>> >> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of
Reduce at
>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>> switched to
>> >> >>>> CANCELED
>> >> >>>> 12/01/2015 15:56:08    Source: Custom Source -> Map
-> Map(1/1)
>> >> >>>> switched
>> >> >>>> to FAILED
>> >> >>>> java.lang.Exception
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>> >> >>>>     at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> >> >>>>     at java.lang.Thread.run(Thread.java:745)
>> >> >>>> Caused by: java.lang.NullPointerException
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>> >> >>>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>> >> >>>>     at
>> >> >>>>
>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> >> >>>>     at
>> >> >>>>
>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>> >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>> >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>> >> >>>>     at
>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>> >> >>>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>> >> >>>>     at
>> >> >>>>
>> >> >>>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>> >> >>>>
>> >> >>>>
>> >> >>>> Any ideas on what could cause this behaviour?
>> >> >>>>
>> >> >>>> Best,
>> >> >>>> Mihail
>> >> >>>
>> >> >>>
>> >> >
>> >
>> >
>>
>
>

Mime
View raw message