flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Sharing Java Collections within Flink Cluster
Date Wed, 07 Sep 2016 20:15:27 GMT
That depends.
1) Growing/Shrinking: This should work. New entries can always be inserted.
In order to remove entries from the k-v-state you have to set the value to
null. Note that you need an explicit delete-value record to trigger the
eviction.
2) Multiple lookups: This does only work if all lookups are independent
from each other. You can partition DS1 only on a single key and the other
keys might be located on different shards. A workaround might be to
duplicate S1 events for each key that they need to look up. However, you
might need to collect events from the same S1 event after the join. If that
does not work for you, the only thing that comes to my mind is to broadcast
the state and keep a full local copy in each operator.

Let me add one more thing regarding the upcoming rescaling feature. If this
is interesting for you, rescaling will also work (maybe not in the first
version) for broadcasted state, i.e. state which is the same on all
parallel operator instances.

2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com>:

> I'm understanding this better with your explanation..
> With this use case,    each element in DS1 has to look up against a 'bunch
> of keys' from DS2 and DS2 could shrink/expand in terms of the no., of
> keys.... will the key-value shard work in this case?
>
> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Operator state is always local in Flink. However, with key-value state,
>> you can have something which behaves kind of similar to a distribute
>> hashmap, because each operator holds a different shard/partition of the
>> hashtable.
>>
>> If you have to do only a single key lookup for each element of DS1, you
>> should think about partitioning both streams (keyBy) and writing the state
>> into Flink's key-value state [1].
>>
>> This will have several benefits:
>> 1) State does not need to be replicated
>> 2) Depending on the backend (RocksDB) [2], parts of the state can reside
>> on disk. You are not bound to the memory of the JVM.
>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>> 4) It will only be possible to rescale jobs with key-value state (this
>> feature is currently under development).
>>
>> If using the key-value state is possible, I'd go for that.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/state.html
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/state_backends.html
>>
>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com>
>> :
>>
>>> certainly, what I thought as well...
>>> The output of DataStream2 could be in 1000s and there are state
>>> updates...
>>> reading this topic from the other job, job1, is okie.
>>> However, assuming that we maintain this state into a collection, and
>>> updating the state (by reading from the topic) in this collection, will
>>> this be replicated across the cluster within this job1 ?
>>>
>>>
>>>
>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>>
>>>> Is writing DataStream2 to a Kafka topic and reading it from the other
>>>> job an option?
>>>>
>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>>     Thanks for your response. Apparently these DataStream
>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>>>> running within the same cluster.
>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>> 'cache' on which (Job1) needs to work on.
>>>>>     Our intention is to not use the external key/value store as we are
>>>>> trying to localize the cache within the cluster.
>>>>>     Is there a way?
>>>>>
>>>>> Best Regards
>>>>> CVP
>>>>>
>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhueske@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Flink does not provide shared state.
>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>>>> each operator has its own local copy of the state.
>>>>>>
>>>>>> If that does not work for you because the state is too large and
if
>>>>>> it is possible to partition the state (and both streams), you can
also use
>>>>>> keyBy instead of broadcast.
>>>>>>
>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com>:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>
>>>>>>> Best Regards
>>>>>>> Varaga
>>>>>>>
>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Team,
>>>>>>>>
>>>>>>>>     I'm working on a Flink Streaming application. The data
is
>>>>>>>> injected through Kafka connectors. The payload volume is
roughly 100K/sec.
>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>> This application also uses another DataStream, call it DataStream2,
>>>>>>>> (consumes events off a kafka topic). The elements of this
DataStream2
>>>>>>>> involves in a certain transformation that finally updates
a Hashmap(/Java
>>>>>>>> util Collection). Apparently the flink application should
share this
>>>>>>>> HashMap across the flink cluster so that DataStream1 application
could
>>>>>>>> check the state of the values in this collection. Is there
a way to do this
>>>>>>>> in Flink?
>>>>>>>>
>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message