flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: Need some help to implement the outer join operator
Date Mon, 22 Dec 2014 13:23:03 GMT
Hi Wilson,

the OuterJoinMapFunction can be done, just as Chesnay said.
Initialise it with a JoinFunction object in the constructor and call its
join() function in the map() call.
You can also pass the JoinFunction directly to the CoGroupFunction and call
it in cogroup() instead of building the Tuple2<In1, In2> pairs. Then you do
not need the MapFunction and things might be easier and cleaner.

The code of the OuterJoinCoGroupFunction looks good, except that you should
rather use an ArrayList<Tuple2<String, Integer>> instead of a
HashSet<Integer> in case that the first input contains duplicate values.

Let me know, if you have any other questions.
Cheers, Fabian



2014-12-21 14:38 GMT+01:00 Chesnay Schepler <chesnay.schepler@fu-berlin.de>:

> Hey Wilson,
>
> the MapFunction should act as a wrapper for the join function. create a
> class extending RichMapFunction, and pass the joinfunction via the
> constructor. then you delegate open/close calls to it, with the map
> function looking something like this:
>
> map(Tuple2<...> tuple) {
> return joinFunction.join(tuple.f0, tuple.f1);
> }
>
> Regards,
> Chesnay
>
> On 21.12.2014 14:11, Zihong Cao wrote:
> > Hi Fabian,
> >
> > It is very helpful of your response! But in order to make sure I
> understand correctly, I put my pseudo-code here first:
> >
> > class OuterJoinCoGroupFunction implements CoGroupFunction<Tuple2<String,
> Integer>, Tuple2<String, Double>, Double>{
> >
> >     @Override
> >     public void coGroup(Iterable<Tuple2<String, Integer> > iVals,
> Iterable<Tuple2<String, Double> > dVals, Collector<Tuple2<Integer,
Double>
> out){
> >         Set<Integer> ints = new HashSet<Integer>();
> >
> >         for (Tuple2<String, Integer > > val : iVals){
> >             ints.add(val.f1);
> >         }
> >
> >         if(ints.isEmpty()){
> >             ints.add(NULL);
> >         }
> >
> >         for (Tuple2<String, Double> val : dVals){
> >             for (Integer i : ints){
> >                 out.collect(new Tuple2(i, val.f1));
> >             }
> >         }
> >       }
> >   }
> > The code above try to builds the matching pairs and if one of the group
> is empty, I append the NULL value to it. However, I don’t really understand
> how to implement the OuterJoinMapFunction.
> >
> > I am also puzzled about how the Reduce/GroupReduce translated into Map
> -> Reduce. Where can I found the materials or the source code about this?
> >
> > Best,
> > Wilson.
> >
> >> 在 2014年12月15日,下午5:17,Fabian Hueske <fhueske@apache.org>
写道:
> >>
> >> That's a good point.
> >>
> >> You can implement an outer join using the available runtime. This way
> you
> >> do not need to touch the optimizer and runtime but only the API layer.
> >> This basically means to add syntactic sugar to the available API. The
> API
> >> will translate the outer join into a CoGroup which builds all pairs of
> >> joining elements and a Map which applies the join function to each
> joined
> >> pair.
> >>
> >> It could look like this:
> >>
> >> DataSet<TypeX> in1;
> >> DataSet<TypeY> in2;
> >> in1.outerJoin(in2).where(...).equalTo(...).with(new MyJoinFunction)
> >>
> >> which would be translated into
> >>
> >> in1.coGroup(in2).where(...).equalTo(...).with(new
> >> OuterJoinCoGroupFunction).map(new OuterJoinMapFunction(MyJoinFunction));
> >>
> >> OJCoGroupFunction and OJMapFunction are functions that you need to
> >> implement.
> >> OJCoGroupFunction does what Stephan said (it builds pairs of matching
> >> elements) and returns a Tuple2<TypeX, TypeY>.
> >> OJMapFunction unpacks the Tuple2<TypeX, TypeY> and calls the user's Join
> >> function (MyJoinFunction).
> >>
> >> There are a few operators implemented this way. For example have a look
> at
> >> the Reduce/GroupReduce with KeySelectors which are translated into Map
> ->
> >> Reduce  (or Map -> GroupReduce).
> >>
> >> Let us know, if you have any questions!
> >>
> >> Cheers, Fabian
> >>
> >>
> >> 2014-12-13 16:25 GMT+01:00 Stephan Ewen <sewen@apache.org>:
> >>> Hi Wilson!
> >>>
> >>> You can start by mocking an outer join operator using a special CoGroup
> >>> function. If one of the two sides for a group is empty, you have the
> case
> >>> where you need to append null values. Otherwise, you build the
> Cartesian
> >>> produce within the group.
> >>>
> >>> For a proper through-the-stack implementation (not sure if that is
> needed,
> >>> but may be nice to have), have a look here:
> >>>
> >>>
> >>>
> http://flink.incubator.apache.org/docs/0.7-incubating/internal_add_operator.html
> >>>
> >>> Greetings,
> >>> Stephan
> >>>
> >>>
> >>> On Sat, Dec 13, 2014 at 3:19 AM, Zihong Cao <wilsoncao01@gmail.com>
> wrote:
> >>>> Hi,
> >>>>
> >>>> I am trying to pick up the outer join operator. However, as Fabian
> >>>> mentioned to me, that this task would require to touch many different
> >>>> components of the system, it would be a challenge job for me.
> Therefore I
> >>>> would need some help:-)
> >>>>
> >>>> I might need to walk through some features like Compiler/Optimizer and
> >>>> Runtime(as Fabian mentioned to me), so where should I start to get
> >>> familiar?
> >>>> One more thing, is the outer join operator implementation similar to
> the
> >>>> pure join operator?
> >>>>
> >>>> Best,
> >>>> Wilson Cao
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message