Hi Till,
thanks for your reply!
The problem with that is, that I sometimes combine two elements:
So from x0 > x1 > x2 I join (x0, x1) which might become x0 > x2 in
the end.
The indices from zipWithIndex then are 0 and 2, resulting in equal joins
flags. Sequential elements always have to have alternating flags, which
gets violated here.
Best
Fridtjof
Am 01.02.16 um 12:26 schrieb Till Rohrmann:
>
> Hi Fridtjof,
>
> I might miss something, but can’t you assign the ids once before
> starting the iteration and then reuse them throughout the iterations?
> Of course you would have to add another field to your input data but
> then you don’t have to run the zipWithIndex for every iteration.
>
> Cheers,
> Till
>
>
>
> On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander
> <fsander@mailbox.tuberlin.de <mailto:fsander@mailbox.tuberlin.de>>
> wrote:
>
> (tried to reformat)
>
>
> Hi,
>
> I have a problem which seems to be unsolvable in Flink at the
> moment (1.0Snapshot, current master branch)
> and I would kindly ask for some input, ideas on alternative
> approaches or just a confirmatory "yup, that doesn't work".
>
> ### Here's the situation:
>
> I have a dataset and its elements are totally ascending sorted by
> some key (Int). Each element has a "nextpointer" to its
> successor, which is just another field with the key of the
> following element:
>
> x0 > x1 > x2 > x3 > ... > xn
>
> The keys are not necessarily increasing by 1, so it may be that:
> x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I
> need to process that set in the following way:
>
> iterate:
>
> find all pairs of elements where "next == key" BUT make sure no
> element appears in multiple pairs
>
> example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair
> (x1, x2), (x3, x4), ...
>
> then, if some condition is met, combine a pair
>
> run above procedure again with switched pairingcondition:
>
> example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair
> (x0, x1), (x2, x3), ..
>
> I hope the problem is clear...
>
>
> ### Now my approach: pseudoscalacode:
>
>
> val indexed = input.zipWithIndex
>
> val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))
>
> val left = flagged.filter(el => el.flag)
>
> val right = flagged.filter(el => !el.flag)
>
> left.fullOuterJoin(right)
>
> .where(el.next)
>
> .equalTo(el.key)
>
> ...
>
>
> I attach my elements with a temporary key, that is increasing by
> 1, with zipWithIndex. Then, I map that tempKey to a boolean
> joinFlag: true if key is even, false if key is odd. Then I filter
> all elements with true, and put them in a dataset that is the left
> side of the next == key join. The right side are all elements with
> flag == false In the second run, I switch the flag construction to
> el.setFlag(i % 2 != 0).
>
> That actually works, there is only one problem:
>
>
> ### The problem:
>
>
> In my approach, I must not loose the total ordering of the data,
> because only if that ordering is preserved, the assignment of
> alternating joinflags works. Initially it is done by
> rangepartitioning and partitionsorting. However, that ordering
> is destroyed, when data is shuffled for the join. And I can not
> restore it, because I have to run the whole thing in an iteration,
> and rangepartitioning is not supported within iterations.
>
>
> ### Help?
>
> It sounds all very complicated, but the only thing I really have
> to solve is that join without any element appearing in multiple
> pairs (as described in "the situation"). If anyone has any idea
> how to solve this, that person would make my day so hard...
>
> Anyways, thanks for your time!
>
> Best, Fridtjof
>
>
>
>
