flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: How to perform this join operation?
Date Wed, 25 May 2016 12:04:16 GMT
Hi Elias!

I think you brought up a couple of good issues. Let me try and summarize
what we have so far:

1) Joining in a more flexible fashion
    => The problem you are solving with the trailing / sliding window
combination: Is the right way to phrase the join problem "join records
where key is equal and timestamps are within X seconds (millis/minutes/...)
of each other"?
    => That should definitely have an API abstraction. The first version
could me implemented exactly with a combination of sliding and trailing

    => For joins between windowed and non windowed streams in the long run:
Aljoscha posted the Design Doc on side inputs. Would that cover the use
case as a long-term solution?

2) Lists that are larger than the memory
    => The ListState returns an Iterable, but it is eagerly materialized
from RocksDB. Is there a way to "stream" the bytes from RocksDB? Flink
could then deserialize them in a streamed fashion as well.

3) Can you elaborate a bit on the OrderedListState? Do you think of
multiple values (ordered) per key, or a sequence of key/value pairs,
ordered by key?
    => Currently Flink limits the scope of key accesses to the values
current key (as defined in the keyBy() function). That way, the system can
transparently redistribute keys when changing the parallelism.


On Sat, May 21, 2016 at 12:24 AM, Elias Levy <fearsome.lucidity@gmail.com>

> Till,
> An issue with your suggestion is that the job state may grow unbounded.
> You are managing
> expiration of data from the cache in the operator, but the state is
> partitioned by the stream key.
> That means if we no longer observe a key, the state associated with that
> key will never be
> removed.
> In my data set keys come and go, and many will never be observed again.
> That will lead to
> continuous state growth over time.
> On Mon, May 2, 2016 at 6:06 PM, Elias Levy <fearsome.lucidity@gmail.com>
> wrote:
>> Thanks for the suggestion.  I ended up implementing it a different way.
>> [...]
>> On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <trohrmann@apache.org>
>> wrote:
>>> orry for the late reply. You're right that with the windowed join you
>>> would have to deal with pairs where the timestamp of (x,y) is not
>>> necessarily earlier than the timestamp of z. Moreover, by using sliding
>>> windows you would receive duplicates as you've described. Using tumbling
>>> windows would mean that you lose join matches if (x,y) lives in an earlier
>>> window. Thus, in order to solve your problem you would have to write a
>>> custom stream operator.
>>> The stream operator would do the following: Collecting the inputs from
>>> (x,y) and z which are already keyed. Thus, we know that x=z holds true.
>>> Using a priority queue we order the elements because we don't know how the
>>> arrive at the operator. Whenever we receive a watermark indicating that no
>>> earlier events can arrive anymore, we can go through the two priority
>>> queues to join the elements. The queues are part of the operators state so
>>> that we don't lose information in case of a recovery.
>>> I've sketched such an operator here [1]. I hope this helps you to get
>>> started.

View raw message