flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fridtjof Sander <fsan...@mailbox.tu-berlin.de>
Subject join with no element appearing in multiple join-pairs
Date Mon, 01 Feb 2016 10:32:46 GMT
Hi,

I have a problem which seems to be unsolvable in Flink at the moment (1.0-Snapshot, current
master branch)
and I would kindly ask for some input, ideas on alternative approaches or just a confirmatory
"yup, that doesn't work".

### Here's the situation:

I have a dataset and its elements are totally ascending sorted by some 
key (Int). Each element has a "next-pointer" to its successor, which is 
just another field with the key of the following element: x0 -> x1 -> x2 
-> x3 -> ... -> xn The keys are not necessarily increasing by 1, so it 
may be that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so 
on. I need to process that set in the following way: iterate: find all 
pairs of elements where "next == key" BUT make sure no element appears 
in multiple pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ... but 
don't pair (x1, x2), (x3, x4), ... then, if some condition is met, 
combine a pair run above procedure again with switched 
pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, x6), ... do 
not pair (x0, x1), (x2, x3), .. I hope the problem is clear... ### Now 
my approach: pseudo-scala-code:

val indexed = input.zipWithIndex val flagged = indexed.map((i, el) => 
el.setFlag(i % 2 == 0)) val left = flagged.filter(el => el.flag)
val right = flagged.filter(el => !el.flag) left.fullOuterJoin(right) 
.where(el.next) .equalTo(el.key) ... I attach my elements with a 
temporary key, that is increasing by 1, with zipWithIndex. Then, I map 
that tempKey to a boolean joinFlag: true if key is even, false if key is 
odd. Then I filter all elements with true, and put them in a dataset 
that is the left side of the next == key join. The right side are all 
elements with flag == false In the second run, I switch the flag 
construction to el.setFlag(i % 2 != 0). That actually works, there is 
only one problem: ### The problem: In my approach, I must not loose the 
total ordering of the data, because only if that ordering is preserved, 
the assignment of alternating join-flags works. Initially it is done by 
range-partitioning and partition-sorting. However, that ordering is 
destroyed, when data is shuffled for the join. And I can not restore it, 
because I have to run the whole thing in an iteration, and 
range-partitioning is not supported within iterations. ### Help? It 
sounds all very complicated, but the only thing I really have to solve 
is that join without any element appearing in multiple pairs (as 
described in "the situation"). If anyone has any idea how to solve this, 
that person would make my day so hard... Anyways, thanks for your time! 
Best, Fridtjof


Mime
View raw message