flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: How to perform this join operation?
Date Wed, 04 May 2016 10:17:41 GMT
Hi Elias,
Samza and Flink operate at slightly different abstraction layers here. The
Samza KeyValueStore basically has the interface of a Java HashMap. When
accessing keyed state the key is always explicit in the method access and
it only allows a simple put and get per key. Flink State, such as
ValueState, ListState and ReducingState is implicitly scoped to the key of
the input element and it allows different "shapes" of state, for example
the ListState uses the efficient merge operation to add to the state
instead of a get-update-put cycle that would be required in Samza
KeyValueStore.

In code, this is what Samza does:

class Operator {
   KeyValueStore<K, V> store = ...
   void process(KV<Key, Value> element) {
      value = store.get(element.getKey())
      ...
      store.put(element.getKey(), ...)
   }
}

while this is what Flink does:

class Operator {
   ValueState<V> state = ...
   void internalProcess(KV<Key, Value> element) {
     state.setKey(element.getKey())
     process(element)
   }

   void process(KV<Key, Value> element) {
      value = state.get()
      ...
      state.update(...)
   }
}

In Flink we are dealing with the keys internally, which makes it easier for
us to implement things like automatic scaling with rebalancing of keyed
state (Till is working on this right now). Underneath we have something
similar to the KeyValueStore, if you want, you could write a custom
operator that deals with these details directly and and handles managing of
keys. The thing we don't support right now is iterating over all keys/state
for the locally held keys. I'm changing this, however, in this PR:
https://github.com/apache/flink/pull/1957.
Then you can do everything that you can do with the Samza KeyValueStore
plus a bit more because we have more specific types of state that exploit
features such as merge instead of put-update-get.

I hope this clarifies things a bit. :-)

Cheers,
Aljoscha

On Wed, 4 May 2016 at 00:28 Elias Levy <fearsome.lucidity@gmail.com> wrote:

> Till,
>
> Thanks again for putting this together.  It is certainly along the lines
> of what I want to accomplish, but I see some problem with it.  In your code
> you use a ValueStore to store the priority queue.  If you are expecting to
> store a lot of values in the queue, then you are likely to be using RocksDB
> as the state backend.  But if you use a ValueStore for the priority queue
> with RocksDB as the backend, the whole priority queue will be deserialized
> and serialized each time you add an item to it.  That will become a
> crushing cost as the queue grows.
>
> I could instead use a ListState with the RocksDB state, that way only the
> single value being added is serialized on an add.  But the get() operation
> in the RocksDBListState seems very inefficient, defeating the idea of
> working with data sets that don't fit in memory.  It loads all values into
> a List instead of returning an Iterable that returns values in the list by
> iterating via the RockDB scan API.  Samza has the advantage here, as it
> provides a ordered KV state API that allows you to truly iterate over the
> values in RocksDB and a caching lager to batch writes into RocksDB. I am
> surprised there is no OrderedKeyValueStore API Flink.
>
> Given that only the RocksDB backend can store more state that can fit in
> memory and the cost associated with its get() method when keeping track of
> a list, it seems like there isn't a good solution keep track of large state
> in the form of a list or ordered list in Flink right now.
>
>
> On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> The stream operator would do the following: Collecting the inputs from
>> (x,y) and z which are already keyed. Thus, we know that x=z holds true.
>> Using a priority queue we order the elements because we don't know how the
>> arrive at the operator. Whenever we receive a watermark indicating that no
>> earlier events can arrive anymore, we can go through the two priority
>> queues to join the elements. The queues are part of the operators state so
>> that we don't lose information in case of a recovery.
>>
>> I've sketched such an operator here [1]. I hope this helps you to get
>> started.
>>
>

Mime
View raw message