flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fridtjof Sander <fsan...@mailbox.tu-berlin.de>
Subject Re: join with no element appearing in multiple join-pairs
Date Mon, 01 Feb 2016 11:32:40 GMT
Hi Till,

thanks for your reply!

The problem with that is, that I sometimes combine two elements:

So from x0 -> x1 -> x2 I join (x0, x1) which might become x0 -> x2 in 
the end.

The indices from zipWithIndex then are 0 and 2, resulting in equal joins 
flags. Sequential elements always have to have alternating flags, which 
gets violated here.

Best
Fridtjof

Am 01.02.16 um 12:26 schrieb Till Rohrmann:
>
> Hi Fridtjof,
>
> I might miss something, but can’t you assign the ids once before 
> starting the iteration and then reuse them throughout the iterations? 
> Of course you would have to add another field to your input data but 
> then you don’t have to run the |zipWithIndex| for every iteration.
>
> Cheers,
> Till
>
> ​
>
> On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander 
> <fsander@mailbox.tu-berlin.de <mailto:fsander@mailbox.tu-berlin.de>> 
> wrote:
>
>     (tried to reformat)
>
>
>     Hi,
>
>     I have a problem which seems to be unsolvable in Flink at the
>     moment (1.0-Snapshot, current master branch)
>     and I would kindly ask for some input, ideas on alternative
>     approaches or just a confirmatory "yup, that doesn't work".
>
>     ### Here's the situation:
>
>     I have a dataset and its elements are totally ascending sorted by
>     some key (Int). Each element has a "next-pointer" to its
>     successor, which is just another field with the key of the
>     following element:
>
>     x0 -> x1 -> x2 -> x3 -> ... -> xn
>
>     The keys are not necessarily increasing by 1, so it may be that:
>     x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I
>     need to process that set in the following way:
>
>     iterate:
>
>     find all pairs of elements where "next == key" BUT make sure no
>     element appears in multiple pairs
>
>     example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair
>     (x1, x2), (x3, x4), ...
>
>     then, if some condition is met, combine a pair
>
>     run above procedure again with switched pairing-condition:
>
>     example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair
>     (x0, x1), (x2, x3), ..
>
>     I hope the problem is clear...
>
>
>     ### Now my approach: pseudo-scala-code:
>
>
>     val indexed = input.zipWithIndex
>
>     val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))
>
>     val left = flagged.filter(el => el.flag)
>
>     val right = flagged.filter(el => !el.flag)
>
>     left.fullOuterJoin(right)
>
>      .where(el.next)
>
>      .equalTo(el.key)
>
>      ...
>
>
>     I attach my elements with a temporary key, that is increasing by
>     1, with zipWithIndex. Then, I map that tempKey to a boolean
>     joinFlag: true if key is even, false if key is odd. Then I filter
>     all elements with true, and put them in a dataset that is the left
>     side of the next == key join. The right side are all elements with
>     flag == false In the second run, I switch the flag construction to
>     el.setFlag(i % 2 != 0).
>
>     That actually works, there is only one problem:
>
>
>     ### The problem:
>
>
>     In my approach, I must not loose the total ordering of the data,
>     because only if that ordering is preserved, the assignment of
>     alternating join-flags works. Initially it is done by
>     range-partitioning and partition-sorting. However, that ordering
>     is destroyed, when data is shuffled for the join. And I can not
>     restore it, because I have to run the whole thing in an iteration,
>     and range-partitioning is not supported within iterations.
>
>
>     ### Help?
>
>     It sounds all very complicated, but the only thing I really have
>     to solve is that join without any element appearing in multiple
>     pairs (as described in "the situation"). If anyone has any idea
>     how to solve this, that person would make my day so hard...
>
>     Anyways, thanks for your time!
>
>     Best, Fridtjof
>
>
>
>     Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
>>     Hi,
>>
>>     I have a problem which seems to be unsolvable in Flink at the
>>     moment (1.0-Snapshot, current master branch)
>>     and I would kindly ask for some input, ideas on alternative
>>     approaches or just a confirmatory "yup, that doesn't work".
>>
>>     ### Here's the situation:
>>
>>     I have a dataset and its elements are totally ascending sorted by
>>     some key (Int). Each element has a "next-pointer" to its
>>     successor, which is just another field with the key of the
>>     following element: x0 -> x1 -> x2 -> x3 -> ... -> xn The keys
are
>>     not necessarily increasing by 1, so it may be that: x0 has key 2
>>     and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to
>>     process that set in the following way: iterate: find all pairs of
>>     elements where "next == key" BUT make sure no element appears in
>>     multiple pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ...
>>     but don't pair (x1, x2), (x3, x4), ... then, if some condition is
>>     met, combine a pair run above procedure again with switched
>>     pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, x6),
>>     ... do not pair (x0, x1), (x2, x3), .. I hope the problem is
>>     clear... ### Now my approach: pseudo-scala-code:
>>
>>     val indexed = input.zipWithIndex val flagged = indexed.map((i,
>>     el) => el.setFlag(i % 2 == 0)) val left = flagged.filter(el =>
>>     el.flag)
>>     val right = flagged.filter(el => !el.flag)
>>     left.fullOuterJoin(right) .where(el.next) .equalTo(el.key) ... I
>>     attach my elements with a temporary key, that is increasing by 1,
>>     with zipWithIndex. Then, I map that tempKey to a boolean
>>     joinFlag: true if key is even, false if key is odd. Then I filter
>>     all elements with true, and put them in a dataset that is the
>>     left side of the next == key join. The right side are all
>>     elements with flag == false In the second run, I switch the flag
>>     construction to el.setFlag(i % 2 != 0). That actually works,
>>     there is only one problem: ### The problem: In my approach, I
>>     must not loose the total ordering of the data, because only if
>>     that ordering is preserved, the assignment of alternating
>>     join-flags works. Initially it is done by range-partitioning and
>>     partition-sorting. However, that ordering is destroyed, when data
>>     is shuffled for the join. And I can not restore it, because I
>>     have to run the whole thing in an iteration, and
>>     range-partitioning is not supported within iterations. ### Help?
>>     It sounds all very complicated, but the only thing I really have
>>     to solve is that join without any element appearing in multiple
>>     pairs (as described in "the situation"). If anyone has any idea
>>     how to solve this, that person would make my day so hard...
>>     Anyways, thanks for your time! Best, Fridtjof
>>
>
>


Mime
View raw message