flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinay patil <vinay18.pa...@gmail.com>
Subject Re: Streaming - memory management
Date Thu, 01 Sep 2016 16:29:45 GMT
Hi Fabian,

I had already used Co-Group function earlier but were getting some issues
while dealing with watermarks (for one use case I was not getting the
correct result), so I have used the union operator for performing the
outer-join (WindowFunction on a keyedStream), this approach is working
correctly and giving me correct results.

As I have discussed the scenario, I want to maintain the non-matching
records in some store, so that's why I was thinking of using RocksDB as a
store here, where I will maintain the user-defined state  after the
outer-join window operator, and I can query it using Flink to check if the
value for a particular key is present or not , if present I can match them
and send it downstream.

The final goal is to have zero non-matching records, so this is the backup
plan to handle edge-case scenarios.

I have already integrated code to write to Cassandra using Flink Connector,
but I think this will be a better option rather than hitting the query to
external store since RocksDb will store the data to local TM disk, the
retrieval will be faster here than Cassandra , right ?

What do you think ?


Regards,
Vinay Patil

On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n8832h44@n4.nabble.com> wrote:

> Hi Vinay,
>
> can you give a bit more detail about how you plan to implement the outer
> join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>
> An alternative could be to use a CoGroup operator which collects from two
> inputs all elements that share a common key (the join key) and are in the
> same window. The interface of the function provides two iterators over the
> elements of both inputs and can be used to implement outer join
> functionality. The benefit of working with a CoGroupFunction is that you do
> not have to take care of state handling at all.
>
> In case you go for a custom implementation you will need to work with
> operator state.
> However, you do not need to directly interact with RocksDB. Flink is
> taking care of that for you.
>
> Best, Fabian
>
> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>
>> Hi Fabian/Stephan,
>>
>> Waiting for your suggestion
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>
>>> Hi Fabian/Stephan,
>>>
>>> This makes things clear.
>>>
>>> This is the use case I have :
>>> I am performing a outer join operation on the two streams (in window)
>>> after which I get matchingAndNonMatchingStream, now I want to make sure
>>> that the matching rate is high (matching cannot happen if one of the source
>>> is not emitting elements for certain time) , so to tackle this situation I
>>> was thinking of using RocksDB as a state Backend, where I will insert the
>>> unmatched records in it (key - will be same as used for window and value
>>> will be DTO ), so before inserting into it I will check if it is already
>>> present in RocksDB, if yes I will take the data from it and send it
>>> downstream (and ensure I perform the clean operation for that key).
>>> (Also the data to store should be encrypted, encryption part can be
>>> handled )
>>>
>>> so instead of using Cassandra , Can I do this using RocksDB as state
>>> backend since the state is not gone after checkpointing ?
>>>
>>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>>> handling late elements but to tackle edge case scenarios like the one
>>> mentioned above we are having a backup plan of using Cassandra as external
>>> store since we are dealing with financial critical data.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>
>>>> Hi Vinaj,
>>>>
>>>> if you use user-defined state, you have to manually clear it.
>>>> Otherwise, it will stay in the state backend (heap or RocksDB) until the
>>>> job goes down (planned or due to an OOM error).
>>>>
>>>> This is esp. important to keep in mind, when using keyed state.
>>>> If you have an unbounded, evolving key space you will likely run
>>>> out-of-memory.
>>>> The job will constantly add state for each new key but won't be able to
>>>> clean up the state for "expired" keys.
>>>>
>>>> You could implement a clean-up mechanism this if you implement a custom
>>>> stream operator.
>>>> However this is a very low level interface and requires solid
>>>> understanding
>>>> of the internals like timestamps, watermarks and the checkpointing
>>>> mechanism.
>>>>
>>>> The community is currently working on a state expiry feature (state
>>>> will be
>>>> discarded if not requested or updated for x minutes).
>>>>
>>>> Regarding the second question: Does state remain local after
>>>> checkpointing?
>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but
>>>> remains in the operator. So the state is not gone after a checkpoint is
>>>> completed.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>
>>>> > Hi Stephan,
>>>> >
>>>> > Just wanted to jump into this discussion regarding state.
>>>> >
>>>> > So do you mean that if we maintain user-defined state (for non-window
>>>> > operators), then if we do  not clear it explicitly will the data for
>>>> that
>>>> > key remains in RocksDB.
>>>> >
>>>> > What happens in case of checkpoint ? I read in the documentation that
>>>> after
>>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>>> location
>>>> > (hdfs or s3 or other fs), so for user-defined state does the data
>>>> still
>>>> > remain in RocksDB after checkpoint ?
>>>> >
>>>> > Correct me if I have misunderstood this concept
>>>> >
>>>> > For one of our use we were going for this, but since I read the above
>>>> part
>>>> > in documentation so we are going for Cassandra now (to store records
>>>> and
>>>> > query them for a special case)
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > Regards,
>>>> > Vinay Patil
>>>> >
>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>> >
>>>> > > In streaming, memory is mainly needed for state (key/value state).
>>>> The
>>>> > > exact representation depends on the chosen StateBackend.
>>>> > >
>>>> > > State is explicitly released: For windows, state is cleaned up
>>>> > > automatically (firing / expiry), for user-defined state, keys have
>>>> to be
>>>> > > explicitly cleared (clear() method) or in the future will have
the
>>>> option
>>>> > > to expire.
>>>> > >
>>>> > > The heavy work horse for streaming state is currently RocksDB,
which
>>>> > > internally uses native (off-heap) memory to keep the data.
>>>> > >
>>>> > > Does that help?
>>>> > >
>>>> > > Stephan
>>>> > >
>>>> > >
>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>> > > wrote:
>>>> > >
>>>> > > > As per the docs, in Batch mode, dynamic memory allocation
is
>>>> avoided by
>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>> methods.
>>>> > > >
>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn
mode.
>>>> So...
>>>> > > >
>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>> > > >
>>>> > > > - If so, how does Flink detect that an object is no longer
being
>>>> used
>>>> > and
>>>> > > > can be reclaimed for reuse once again ?
>>>> > > >
>>>> > > > -roshan
>>>> > > >
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>> ------------------------------
>> View this message in context: Re: Streaming - memory management
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Re-Streaming-memory-management-tp8829p8832.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Mime
View raw message