flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <till.rohrm...@gmail.com>
Subject Re: Left outer join
Date Fri, 17 Apr 2015 08:04:51 GMT
No its not, but at the moment there is afaik no other way around it. There
is an issue for proper outer join support [1]

[1] https://issues.apache.org/jira/browse/FLINK-687

On Fri, Apr 17, 2015 at 10:01 AM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Could resolve the problem but the fact to accumulate stuff in a local
> variable is it safe if datasets are huge..?
>
> On Fri, Apr 17, 2015 at 9:54 AM, Till Rohrmann <till.rohrmann@gmail.com>
> wrote:
>
>> If it's fine when you have null string values in the cases where
>> D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with
>> Scala API):
>>
>> val ds1: DataSet[(String, String, String)] = getDS1
>> val ds2: DataSet[(String, String, String)] = getDS2
>>
>> ds1.coGroup(ds2).where(2).equalTo(0) {
>>   (left, right, collector: Collector[(String, String, String, String)])
>> => {
>>     if(right.isEmpty) {
>>       left foreach {
>>       element => {
>>       val value1 = if(element._2 == "a1") element._3 else null
>>       val value2 = if(element._2 == "a2") element._3 else null
>>       collector.collect((element._1, null, value1, value2))
>>         }
>>       }
>>     } else {
>>       val array = right.toArray
>>       for(leftElement <- left) {
>>       val value1 = if(leftElement._2 == "a1") leftElement._3 else null
>>     val value2 = if(leftElement._2 == "a2") leftElement._3 else null
>>
>>     for(rightElement <- array) {
>>       collector.collect(leftElement._1, rightElement._1, value1, value2))
>>     }
>>       }
>>     }
>>   }
>> }
>>
>> Does this solve your problem?
>>
>> On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> Hi Till,
>>> thanks for the reply.
>>> What I'd like to do is to merge D1 and D2 if there's a ref from D1 to D2
>>> (D1.f2==D2.f0).
>>> If this condition is true, I would like to produce a set of tuples with
>>> the matching elements
>>> at the first to places (D1.*f2*, D2.*f0*) and the other two values (if
>>> present) of the matching tuple
>>> in D1 when D1.f1==*"a1"* and D1.f2=*"a2"* (string values) respectively.
>>> (PS: For each value of D1.f0 you can have at most one value of a1 and a2)
>>>
>>> Is it more clear?
>>>
>>> On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann <till.rohrmann@gmail.com>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> I don't really understand what you try to do. What does
>>>> D1.f2(D1.f1==p1) mean? What does happen if the condition in D1.f2(if
>>>> D1.f1==p2) is false?
>>>>
>>>> Where does the values a1 and a2 in (A, X, a1, a2) come from when you
>>>> join [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you can
>>>> elaborate a bit more on your example.
>>>>
>>>> Cheers,
>>>>
>>>> Till
>>>>
>>>> On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> I cannot find a solution to my use case :(
>>>>> I have 2 datasets D1 and D2 like:
>>>>>
>>>>> D1:
>>>>> A,p1,a1
>>>>> A,p2,a2
>>>>> A,p3,X
>>>>> B,p3,Y
>>>>> B,p1,b1
>>>>>
>>>>> D2:
>>>>> X,s,V
>>>>> X,r,2
>>>>> Y,j,k
>>>>>
>>>>> I'd like to have a unique dataset D3(Tuple4) like
>>>>>
>>>>> A,X,a1,a2
>>>>> B,Y,b1,null
>>>>>
>>>>> Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if
>>>>> D1.f1==p2)> when D1.f2==D2.f0.
>>>>> Is that possible and how?
>>>>> Could you show me a simple snippet?
>>>>>
>>>>> Thanks in advance,
>>>>> Flavio
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <trohrmann@apache.org>
>>>>> wrote:
>>>>>
>>>>>> You can materialize the input of the right input by creating an array
>>>>>> out of it, for example. Then you can reiterate over it.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>> On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <pompermaier@okkam.it>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Maximilian,
>>>>>>> I tried your solution but it doesn't work because the rightElements
>>>>>>> iterator cannot be used more than once:
>>>>>>>
>>>>>>> Caused by: org.apache.flink.util.TraversableOnceException: The
>>>>>>> Iterable can be iterated over only once. Only the first call
to
>>>>>>> 'iterator()' will succeed.
>>>>>>>
>>>>>>> On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels <mxm@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi Flavio,
>>>>>>>>
>>>>>>>> Here's an simple example of a Left Outer Join:
>>>>>>>> https://gist.github.com/mxm/c2e9c459a9d82c18d789
>>>>>>>>
>>>>>>>> As Stephan pointed out, this can be very easily modified
to
>>>>>>>> construct a Right Outer Join (just exchange leftElements
and rightElements
>>>>>>>> in the two loops).
>>>>>>>>
>>>>>>>> Here's an excerpt with the most important part, the coGroup
>>>>>>>> function:
>>>>>>>>
>>>>>>>> public static class LeftOuterJoin implements CoGroupFunction<Tuple2<Integer,
String>, Tuple2<Integer, String>, Tuple2<Integer, Integer>> {
>>>>>>>>
>>>>>>>>    @Override
>>>>>>>>    public void coGroup(Iterable<Tuple2<Integer, String>>
leftElements,
>>>>>>>>                        Iterable<Tuple2<Integer, String>>
rightElements,
>>>>>>>>                        Collector<Tuple2<Integer, Integer>>
out) throws Exception {
>>>>>>>>
>>>>>>>>       final int NULL_ELEMENT = -1;
>>>>>>>>
>>>>>>>>       for (Tuple2<Integer, String> leftElem : leftElements)
{
>>>>>>>>          boolean hadElements = false;
>>>>>>>>          for (Tuple2<Integer, String> rightElem : rightElements)
{
>>>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0,
rightElem.f0));
>>>>>>>>             hadElements = true;
>>>>>>>>          }
>>>>>>>>          if (!hadElements) {
>>>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0,
NULL_ELEMENT));
>>>>>>>>          }
>>>>>>>>       }
>>>>>>>>
>>>>>>>>    }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <sewen@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think this may be a great example to add as a utility
function.
>>>>>>>>>
>>>>>>>>> Or actually add as an function to the DataSet, internally
realized
>>>>>>>>> as a special case of coGroup.
>>>>>>>>>
>>>>>>>>> We do not have a ready example of that, but it should
be
>>>>>>>>> straightforward to realize. Similar as for the join,
coGroup on the join
>>>>>>>>> keys. Inside the coGroup function, emit the combination
of all values from
>>>>>>>>> the two iterators. If one of them is empty (the one that
is not outer) then
>>>>>>>>> emit all values from the outer side.
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier
<
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> Do you have an already working example of it? :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi <uce@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 15 Apr 2015, at 10:30, Flavio Pompermaier
<
>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> >
>>>>>>>>>>> > Hi to all,
>>>>>>>>>>> > I have to join two datasets but I'd like
to keep all data in
>>>>>>>>>>> the left also if there' no right dataset.
>>>>>>>>>>> > How can you achieve that in Flink? maybe
I should use coGroup?
>>>>>>>>>>>
>>>>>>>>>>> Yes, currently you have to implement this manually
with a coGroup
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>>>
>>>
>>
>
>

Mime
View raw message