flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vieru, Mihail" <mihail.vi...@zalando.de>
Subject Re: NPE with Flink Streaming from Kafka
Date Wed, 02 Dec 2015 17:11:33 GMT
Hi Gyula, Hi Stephan,

thank you for your replies.

We need a state which grows indefinitely for the following use case. An
event is created when a customer places an order. Another event is created
when the order is sent. These events typically occur within days. We need
to catch the cases when the said events occur over a specified time period
to raise an alarm.

So having a window of a couple of days is not feasible. Thus we need the
state.

I believe having a different state backend would circumvent the OOM issue.
We were thinking of Redis for performance reasons. MySQL might do as well,
if it doesn't slow down the processing too much.

Are there limitations for SqlStateBackend when working with state only?
When would the window state limitation occur?

Cheers,
Mihail


2015-12-02 13:38 GMT+01:00 Stephan Ewen <sewen@apache.org>:

> Mihail!
>
> The Flink windows are currently in-memory only. There are plans to relax
> that, but for the time being, having enough memory in the cluster is
> important.
>
> @Gyula: I think window state is currently also limited when using the
> SqlStateBackend, by the size of a row in the database (because windows are
> not key/value state currently)
>
>
> Here are some simple rules-of-thumb to work with:
>
> 1) For windows, the number of expected keys can be without bound. It is
> important to have a rough upper bound for the number of "active keys at a
> certain time". For example, if you have your time windows (let's say by 10
> minutes or so), it only matters how many keys you have within each 10
> minute interval. Those define how much memory you need.
>
> 2) If you work with the "OperatorState" abstraction, then you need to
> think about cleanup a bit. The OperatorState keeps state currently for as
> long until you set the state for the key to "null". This manual state is
> explicitly designed to allow you to keep state across windows and across
> very long time. On the flip side, you need to manage the amount of state
> you store, by releasing state for keys.
>
> 3) If a certain key space grows infinite, you should "scope the state by
> time". A pragmatic solution for that is to define a session window:
>   - The session length defines after what inactivity the state is cleaned
> (let's say 1h session length or so)
>   - The trigger implements this session (there are a few mails on this
> list already that explain how to do this) and take care of evaluating on
> every element.
>   - A count(1) evictor makes sure only one element is ever stored
>
> Greetings,
> Stephan
>
>
> On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra <gyfora@apache.org> wrote:
>
>> Hi,
>>
>> I am working on a use case that involves storing state for billions of
>> keys. For this we use a MySql state backend that will write each key-value
>> state to MySql server so it will only hold a limited set of key-value pairs
>> on heap while maintaining the processing guarantees.
>>
>> This will keep our streaming job from running out of memory as most of
>> the state is off heap. I am not sure if this is relevant to your use case
>> but if the state size grows indefinitely you might want to give it a try.
>>
>> I will write a detailed guide in some days but if you want to get started
>> check this one out:
>>
>> https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing
>>
>> There are some pending improvements that I will commit in the next days
>> that will increase the performance of the MySql adapter
>>
>> Let me know if you are interested in this!
>>
>> Cheers,
>> Gyula
>>
>>
>> Vieru, Mihail <mihail.vieru@zalando.de> ezt írta (időpont: 2015. dec.
>> 2., Sze, 11:26):
>>
>>> Hi Aljoscha,
>>>
>>> we have no upper bound for the number of expected keys. The max size for
>>> an element is 1 KB.
>>>
>>> There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText
>>> operators in the job. In the first Map we parse the contained JSON object
>>> in each element and forward it as a Flink Tuple. In the Reduce we update
>>> the state for each key. That's about it.
>>>
>>> Best,
>>> Mihail
>>>
>>>
>>> 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek <aljoscha@apache.org>:
>>>
>>>> Hi Mihail,
>>>> could you please give some information about the number of keys that
>>>> you are expecting in the data and how big the elements are that you are
>>>> processing in the window.
>>>>
>>>> Also, are there any other operations that could be taxing on Memory. I
>>>> think the different exception you see for 500MB mem size is just because
>>>> Java notices that it ran out of memory at a different part in the program.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>> > On 02 Dec 2015, at 10:57, Vieru, Mihail <mihail.vieru@zalando.de>
>>>> wrote:
>>>> >
>>>> > Yes, with the "start-cluster-streaming.sh" script.
>>>> > If the TaskManager gets 5GB of heap it manages to process ~100
>>>> million messages and then throws the above OOM.
>>>> > If it gets only 500MB it manages to process ~8 million and a somewhat
>>>> misleading exception is thrown:
>>>> >
>>>> > 12/01/2015 19:14:07    Source: Custom Source -> Map -> Map(1/1)
>>>> switched to FAILED
>>>> > java.lang.Exception: Java heap space
>>>> >     at
>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>> >     at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>>>> >     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>> >     at java.lang.Thread.run(Thread.java:745)
>>>> > Caused by: java.lang.OutOfMemoryError: Java heap space
>>>> >     at org.json.simple.parser.Yylex.<init>(Yylex.java:231)
>>>> >     at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34)
>>>> >     at
>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70)
>>>> >     at
>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65)
>>>> >     at
>>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97)
>>>> >     at
>>>> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92)
>>>> >     at
>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450)
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetzger@apache.org>:
>>>> > Its good news that the issue has been resolved.
>>>> >
>>>> > Regarding the OOM, did you start Flink in the streaming mode?
>>>> >
>>>> > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <
>>>> mihail.vieru@zalando.de> wrote:
>>>> > Thank you, Robert! The issue with Kafka is now solved with the
>>>> 0.10-SNAPSHOT dependency.
>>>> >
>>>> > We have run into an OutOfMemory exception though, which appears to be
>>>> related to the state. As my colleague, Javier Lopez, mentioned in a
>>>> previous thread, state handling is crucial for our use case. And as the
>>>> jobs are intended to run for months, stability plays an important role in
>>>> choosing a stream processing framework.
>>>> >
>>>> > 12/02/2015 10:03:53    Fast TumblingTimeWindows(5000) of Reduce at
>>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
FAILED
>>>> > java.lang.OutOfMemoryError: Java heap space
>>>> >     at java.util.HashMap.resize(HashMap.java:703)
>>>> >     at java.util.HashMap.putVal(HashMap.java:662)
>>>> >     at java.util.HashMap.put(HashMap.java:611)
>>>> >     at
>>>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>>>> >     at
>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
>>>> >     at
>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>>> >     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>> >     at java.lang.Thread.run(Thread.java:745)
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <mxm@apache.org>:
>>>> > Thanks! I've linked the issue in JIRA.
>>>> >
>>>> > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetzger@apache.org>
>>>> wrote:
>>>> > > I think its this one
>>>> https://issues.apache.org/jira/browse/KAFKA-824
>>>> > >
>>>> > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <mxm@apache.org>
>>>> wrote:
>>>> > >>
>>>> > >> I know this has been fixed already but, out of curiosity, could
you
>>>> > >> point me to the Kafka JIRA issue for this
>>>> > >> bug? From the Flink issue it looks like this is a Zookeeper
version
>>>> > >> mismatch.
>>>> > >>
>>>> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <
>>>> rmetzger@apache.org>
>>>> > >> wrote:
>>>> > >> > Hi Gyula,
>>>> > >> >
>>>> > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions
from the
>>>> > >> > "release-0.10" branch to Apache's maven snapshot repository.
>>>> > >> >
>>>> > >> >
>>>> > >> > I don't think Mihail's code will run when he's compiling
it
>>>> against
>>>> > >> > 1.0-SNAPSHOT.
>>>> > >> >
>>>> > >> >
>>>> > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.fora@gmail.com>
>>>> wrote:
>>>> > >> >>
>>>> > >> >> Hi,
>>>> > >> >>
>>>> > >> >> I think Robert meant to write setting the connector
dependency
>>>> to
>>>> > >> >> 1.0-SNAPSHOT.
>>>> > >> >>
>>>> > >> >> Cheers,
>>>> > >> >> Gyula
>>>> > >> >>
>>>> > >> >> Robert Metzger <rmetzger@apache.org> ezt írta
(időpont: 2015.
>>>> dec. 1.,
>>>> > >> >> K,
>>>> > >> >> 17:10):
>>>> > >> >>>
>>>> > >> >>> Hi Mihail,
>>>> > >> >>>
>>>> > >> >>> the issue is actually a bug in Kafka. We have
a JIRA in Flink
>>>> for this
>>>> > >> >>> as
>>>> > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>>>> > >> >>>
>>>> > >> >>> Sadly, we haven't released a fix for it yet. Flink
0.10.2 will
>>>> contain
>>>> > >> >>> a
>>>> > >> >>> fix.
>>>> > >> >>>
>>>> > >> >>> Since the kafka connector is not contained in
the flink
>>>> binary, you
>>>> > >> >>> can
>>>> > >> >>> just set the version in your maven pom file to
0.10-SNAPSHOT.
>>>> Maven
>>>> > >> >>> will
>>>> > >> >>> then download the code planned for the 0.10-SNAPSHOT
release.
>>>> > >> >>>
>>>> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>>>> > >> >>> <mihail.vieru@zalando.de>
>>>> > >> >>> wrote:
>>>> > >> >>>>
>>>> > >> >>>> Hi,
>>>> > >> >>>>
>>>> > >> >>>> we get the following NullPointerException
after ~50 minutes
>>>> when
>>>> > >> >>>> running
>>>> > >> >>>> a streaming job with windowing and state that
reads data from
>>>> Kafka
>>>> > >> >>>> and
>>>> > >> >>>> writes the result to local FS.
>>>> > >> >>>> There are around 170 million messages to be
processed, Flink
>>>> 0.10.1
>>>> > >> >>>> stops at ~8 million.
>>>> > >> >>>> Flink runs locally, started with the
>>>> "start-cluster-streaming.sh"
>>>> > >> >>>> script.
>>>> > >> >>>>
>>>> > >> >>>> 12/01/2015 15:06:24    Job execution switched
to status
>>>> RUNNING.
>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source
-> Map ->
>>>> Map(1/1)
>>>> > >> >>>> switched
>>>> > >> >>>> to SCHEDULED
>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source
-> Map ->
>>>> Map(1/1)
>>>> > >> >>>> switched
>>>> > >> >>>> to DEPLOYING
>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000)
of
>>>> Reduce at
>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) ->
Sink: Unnamed(1/1)
>>>> switched to
>>>> > >> >>>> SCHEDULED
>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000)
of
>>>> Reduce at
>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) ->
Sink: Unnamed(1/1)
>>>> switched to
>>>> > >> >>>> DEPLOYING
>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source
-> Map ->
>>>> Map(1/1)
>>>> > >> >>>> switched
>>>> > >> >>>> to RUNNING
>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000)
of
>>>> Reduce at
>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) ->
Sink: Unnamed(1/1)
>>>> switched to
>>>> > >> >>>> RUNNING
>>>> > >> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000)
of
>>>> Reduce at
>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) ->
Sink: Unnamed(1/1)
>>>> switched to
>>>> > >> >>>> CANCELED
>>>> > >> >>>> 12/01/2015 15:56:08    Source: Custom Source
-> Map ->
>>>> Map(1/1)
>>>> > >> >>>> switched
>>>> > >> >>>> to FAILED
>>>> > >> >>>> java.lang.Exception
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>> > >> >>>>     at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>> > >> >>>>     at java.lang.Thread.run(Thread.java:745)
>>>> > >> >>>> Caused by: java.lang.NullPointerException
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>>> > >> >>>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>>> > >> >>>>     at
>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>>> > >> >>>>     at
>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>>> > >> >>>>     at
>>>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>>> > >> >>>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>>> > >> >>>>     at
>>>> > >> >>>>
>>>> > >> >>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>>> > >> >>>>
>>>> > >> >>>>
>>>> > >> >>>> Any ideas on what could cause this behaviour?
>>>> > >> >>>>
>>>> > >> >>>> Best,
>>>> > >> >>>> Mihail
>>>> > >> >>>
>>>> > >> >>>
>>>> > >> >
>>>> > >
>>>> > >
>>>> >
>>>> >
>>>> >
>>>>
>>>>
>>>
>

Mime
View raw message