flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Left outer join
Date Fri, 17 Apr 2015 11:11:33 GMT
If you know that the group cardinality of one input is always 1 (or 0) you
can make that input the one to cache in memory and stream the other input
with potentially more group elements.

2015-04-17 4:09 GMT-05:00 Flavio Pompermaier <pompermaier@okkam.it>:

> That would be very helpful...
>
> Thanks for the support,
> Flavio
>
> On Fri, Apr 17, 2015 at 10:04 AM, Till Rohrmann <till.rohrmann@gmail.com>
> wrote:
>
>> No its not, but at the moment there is afaik no other way around it.
>> There is an issue for proper outer join support [1]
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-687
>>
>> On Fri, Apr 17, 2015 at 10:01 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> Could resolve the problem but the fact to accumulate stuff in a local
>>> variable is it safe if datasets are huge..?
>>>
>>> On Fri, Apr 17, 2015 at 9:54 AM, Till Rohrmann <till.rohrmann@gmail.com>
>>> wrote:
>>>
>>>> If it's fine when you have null string values in the cases where
>>>> D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with
>>>> Scala API):
>>>>
>>>> val ds1: DataSet[(String, String, String)] = getDS1
>>>> val ds2: DataSet[(String, String, String)] = getDS2
>>>>
>>>> ds1.coGroup(ds2).where(2).equalTo(0) {
>>>>   (left, right, collector: Collector[(String, String, String, String)])
>>>> => {
>>>>     if(right.isEmpty) {
>>>>       left foreach {
>>>>       element => {
>>>>       val value1 = if(element._2 == "a1") element._3 else null
>>>>       val value2 = if(element._2 == "a2") element._3 else null
>>>>       collector.collect((element._1, null, value1, value2))
>>>>         }
>>>>       }
>>>>     } else {
>>>>       val array = right.toArray
>>>>       for(leftElement <- left) {
>>>>       val value1 = if(leftElement._2 == "a1") leftElement._3 else null
>>>>     val value2 = if(leftElement._2 == "a2") leftElement._3 else null
>>>>
>>>>     for(rightElement <- array) {
>>>>       collector.collect(leftElement._1, rightElement._1, value1,
>>>> value2))
>>>>     }
>>>>       }
>>>>     }
>>>>   }
>>>> }
>>>>
>>>> Does this solve your problem?
>>>>
>>>> On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Hi Till,
>>>>> thanks for the reply.
>>>>> What I'd like to do is to merge D1 and D2 if there's a ref from D1 to
>>>>> D2 (D1.f2==D2.f0).
>>>>> If this condition is true, I would like to produce a set of tuples
>>>>> with the matching elements
>>>>> at the first to places (D1.*f2*, D2.*f0*) and the other two values
>>>>> (if present) of the matching tuple
>>>>> in D1 when D1.f1==*"a1"* and D1.f2=*"a2"* (string values)
>>>>> respectively.
>>>>> (PS: For each value of D1.f0 you can have at most one value of a1 and
>>>>> a2)
>>>>>
>>>>> Is it more clear?
>>>>>
>>>>> On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann <
>>>>> till.rohrmann@gmail.com> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> I don't really understand what you try to do. What does
>>>>>> D1.f2(D1.f1==p1) mean? What does happen if the condition in D1.f2(if
>>>>>> D1.f1==p2) is false?
>>>>>>
>>>>>> Where does the values a1 and a2 in (A, X, a1, a2) come from when
you
>>>>>> join [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you
can
>>>>>> elaborate a bit more on your example.
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Till
>>>>>>
>>>>>> On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> I cannot find a solution to my use case :(
>>>>>>> I have 2 datasets D1 and D2 like:
>>>>>>>
>>>>>>> D1:
>>>>>>> A,p1,a1
>>>>>>> A,p2,a2
>>>>>>> A,p3,X
>>>>>>> B,p3,Y
>>>>>>> B,p1,b1
>>>>>>>
>>>>>>> D2:
>>>>>>> X,s,V
>>>>>>> X,r,2
>>>>>>> Y,j,k
>>>>>>>
>>>>>>> I'd like to have a unique dataset D3(Tuple4) like
>>>>>>>
>>>>>>> A,X,a1,a2
>>>>>>> B,Y,b1,null
>>>>>>>
>>>>>>> Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if
>>>>>>> D1.f1==p2)> when D1.f2==D2.f0.
>>>>>>> Is that possible and how?
>>>>>>> Could you show me a simple snippet?
>>>>>>>
>>>>>>> Thanks in advance,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <trohrmann@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> You can materialize the input of the right input by creating
an
>>>>>>>> array out of it, for example. Then you can reiterate over
it.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>> On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <pompermaier@okkam.it>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Maximilian,
>>>>>>>>> I tried your solution but it doesn't work because the
>>>>>>>>> rightElements iterator cannot be used more than once:
>>>>>>>>>
>>>>>>>>> Caused by: org.apache.flink.util.TraversableOnceException:
The
>>>>>>>>> Iterable can be iterated over only once. Only the first
call to
>>>>>>>>> 'iterator()' will succeed.
>>>>>>>>>
>>>>>>>>> On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels
<
>>>>>>>>> mxm@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Flavio,
>>>>>>>>>>
>>>>>>>>>> Here's an simple example of a Left Outer Join:
>>>>>>>>>> https://gist.github.com/mxm/c2e9c459a9d82c18d789
>>>>>>>>>>
>>>>>>>>>> As Stephan pointed out, this can be very easily modified
to
>>>>>>>>>> construct a Right Outer Join (just exchange leftElements
and rightElements
>>>>>>>>>> in the two loops).
>>>>>>>>>>
>>>>>>>>>> Here's an excerpt with the most important part, the
coGroup
>>>>>>>>>> function:
>>>>>>>>>>
>>>>>>>>>> public static class LeftOuterJoin implements CoGroupFunction<Tuple2<Integer,
String>, Tuple2<Integer, String>, Tuple2<Integer, Integer>> {
>>>>>>>>>>
>>>>>>>>>>    @Override
>>>>>>>>>>    public void coGroup(Iterable<Tuple2<Integer,
String>> leftElements,
>>>>>>>>>>                        Iterable<Tuple2<Integer,
String>> rightElements,
>>>>>>>>>>                        Collector<Tuple2<Integer,
Integer>> out) throws Exception {
>>>>>>>>>>
>>>>>>>>>>       final int NULL_ELEMENT = -1;
>>>>>>>>>>
>>>>>>>>>>       for (Tuple2<Integer, String> leftElem
: leftElements) {
>>>>>>>>>>          boolean hadElements = false;
>>>>>>>>>>          for (Tuple2<Integer, String> rightElem
: rightElements) {
>>>>>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0,
rightElem.f0));
>>>>>>>>>>             hadElements = true;
>>>>>>>>>>          }
>>>>>>>>>>          if (!hadElements) {
>>>>>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0,
NULL_ELEMENT));
>>>>>>>>>>          }
>>>>>>>>>>       }
>>>>>>>>>>
>>>>>>>>>>    }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <sewen@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think this may be a great example to add as
a utility function.
>>>>>>>>>>>
>>>>>>>>>>> Or actually add as an function to the DataSet,
internally
>>>>>>>>>>> realized as a special case of coGroup.
>>>>>>>>>>>
>>>>>>>>>>> We do not have a ready example of that, but it
should be
>>>>>>>>>>> straightforward to realize. Similar as for the
join, coGroup on the join
>>>>>>>>>>> keys. Inside the coGroup function, emit the combination
of all values from
>>>>>>>>>>> the two iterators. If one of them is empty (the
one that is not outer) then
>>>>>>>>>>> emit all values from the outer side.
>>>>>>>>>>>
>>>>>>>>>>> Greetings,
>>>>>>>>>>> Stephan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier
<
>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Do you have an already working example of
it? :)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi
<uce@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 15 Apr 2015, at 10:30, Flavio Pompermaier
<
>>>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Hi to all,
>>>>>>>>>>>>> > I have to join two datasets but
I'd like to keep all data in
>>>>>>>>>>>>> the left also if there' no right dataset.
>>>>>>>>>>>>> > How can you achieve that in Flink?
maybe I should use
>>>>>>>>>>>>> coGroup?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, currently you have to implement
this manually with a
>>>>>>>>>>>>> coGroup
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Mime
View raw message