flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Streaming - memory management
Date Thu, 01 Sep 2016 15:21:19 GMT
Hi Vinay,

can you give a bit more detail about how you plan to implement the outer
join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?

An alternative could be to use a CoGroup operator which collects from two
inputs all elements that share a common key (the join key) and are in the
same window. The interface of the function provides two iterators over the
elements of both inputs and can be used to implement outer join
functionality. The benefit of working with a CoGroupFunction is that you do
not have to take care of state handling at all.

In case you go for a custom implementation you will need to work with
operator state.
However, you do not need to directly interact with RocksDB. Flink is taking
care of that for you.

Best, Fabian

2016-09-01 16:13 GMT+02:00 vinay patil <vinay18.patil@gmail.com>:

> Hi Fabian/Stephan,
>
> Waiting for your suggestion
>
> Regards,
> Vinay Patil
>
> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>
>> Hi Fabian/Stephan,
>>
>> This makes things clear.
>>
>> This is the use case I have :
>> I am performing a outer join operation on the two streams (in window)
>> after which I get matchingAndNonMatchingStream, now I want to make sure
>> that the matching rate is high (matching cannot happen if one of the source
>> is not emitting elements for certain time) , so to tackle this situation I
>> was thinking of using RocksDB as a state Backend, where I will insert the
>> unmatched records in it (key - will be same as used for window and value
>> will be DTO ), so before inserting into it I will check if it is already
>> present in RocksDB, if yes I will take the data from it and send it
>> downstream (and ensure I perform the clean operation for that key).
>> (Also the data to store should be encrypted, encryption part can be
>> handled )
>>
>> so instead of using Cassandra , Can I do this using RocksDB as state
>> backend since the state is not gone after checkpointing ?
>>
>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>> handling late elements but to tackle edge case scenarios like the one
>> mentioned above we are having a backup plan of using Cassandra as external
>> store since we are dealing with financial critical data.
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>
>>> Hi Vinaj,
>>>
>>> if you use user-defined state, you have to manually clear it.
>>> Otherwise, it will stay in the state backend (heap or RocksDB) until the
>>> job goes down (planned or due to an OOM error).
>>>
>>> This is esp. important to keep in mind, when using keyed state.
>>> If you have an unbounded, evolving key space you will likely run
>>> out-of-memory.
>>> The job will constantly add state for each new key but won't be able to
>>> clean up the state for "expired" keys.
>>>
>>> You could implement a clean-up mechanism this if you implement a custom
>>> stream operator.
>>> However this is a very low level interface and requires solid
>>> understanding
>>> of the internals like timestamps, watermarks and the checkpointing
>>> mechanism.
>>>
>>> The community is currently working on a state expiry feature (state will
>>> be
>>> discarded if not requested or updated for x minutes).
>>>
>>> Regarding the second question: Does state remain local after
>>> checkpointing?
>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but
>>> remains in the operator. So the state is not gone after a checkpoint is
>>> completed.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>
>>> > Hi Stephan,
>>> >
>>> > Just wanted to jump into this discussion regarding state.
>>> >
>>> > So do you mean that if we maintain user-defined state (for non-window
>>> > operators), then if we do  not clear it explicitly will the data for
>>> that
>>> > key remains in RocksDB.
>>> >
>>> > What happens in case of checkpoint ? I read in the documentation that
>>> after
>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>> location
>>> > (hdfs or s3 or other fs), so for user-defined state does the data still
>>> > remain in RocksDB after checkpoint ?
>>> >
>>> > Correct me if I have misunderstood this concept
>>> >
>>> > For one of our use we were going for this, but since I read the above
>>> part
>>> > in documentation so we are going for Cassandra now (to store records
>>> and
>>> > query them for a special case)
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > Regards,
>>> > Vinay Patil
>>> >
>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>> >
>>> > > In streaming, memory is mainly needed for state (key/value state).
>>> The
>>> > > exact representation depends on the chosen StateBackend.
>>> > >
>>> > > State is explicitly released: For windows, state is cleaned up
>>> > > automatically (firing / expiry), for user-defined state, keys have
>>> to be
>>> > > explicitly cleared (clear() method) or in the future will have the
>>> option
>>> > > to expire.
>>> > >
>>> > > The heavy work horse for streaming state is currently RocksDB, which
>>> > > internally uses native (off-heap) memory to keep the data.
>>> > >
>>> > > Does that help?
>>> > >
>>> > > Stephan
>>> > >
>>> > >
>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>> > > wrote:
>>> > >
>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>> avoided by
>>> > > > storing messages being processed in ByteBuffers via Unsafe methods.
>>> > > >
>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn mode.
>>> So...
>>> > > >
>>> > > > - Am wondering if this is also the case with Streaming ?
>>> > > >
>>> > > > - If so, how does Flink detect that an object is no longer being
>>> used
>>> > and
>>> > > > can be reclaimed for reuse once again ?
>>> > > >
>>> > > > -roshan
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>
> ------------------------------
> View this message in context: Re: Streaming - memory management
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Mime
View raw message