flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zihong Cao <wilsonca...@gmail.com>
Subject Re: Need some help to implement the outer join operator
Date Sun, 21 Dec 2014 13:11:51 GMT
Hi Fabian,

It is very helpful of your response! But in order to make sure I understand correctly, I put
my pseudo-code here first:

class OuterJoinCoGroupFunction implements CoGroupFunction<Tuple2<String, Integer>,
Tuple2<String, Double>, Double>{

    public void coGroup(Iterable<Tuple2<String, Integer> > iVals, Iterable<Tuple2<String,
Double> > dVals, Collector<Tuple2<Integer, Double> out){
        Set<Integer> ints = new HashSet<Integer>();

        for (Tuple2<String, Integer > > val : iVals){


        for (Tuple2<String, Double> val : dVals){
            for (Integer i : ints){
                out.collect(new Tuple2(i, val.f1));
The code above try to builds the matching pairs and if one of the group is empty, I append
the NULL value to it. However, I don’t really understand how to implement the OuterJoinMapFunction.

I am also puzzled about how the Reduce/GroupReduce translated into Map -> Reduce. Where
can I found the materials or the source code about this?


> 在 2014年12月15日,下午5:17,Fabian Hueske <fhueske@apache.org> 写道:
> That's a good point.
> You can implement an outer join using the available runtime. This way you
> do not need to touch the optimizer and runtime but only the API layer.
> This basically means to add syntactic sugar to the available API. The API
> will translate the outer join into a CoGroup which builds all pairs of
> joining elements and a Map which applies the join function to each joined
> pair.
> It could look like this:
> DataSet<TypeX> in1;
> DataSet<TypeY> in2;
> in1.outerJoin(in2).where(...).equalTo(...).with(new MyJoinFunction)
> which would be translated into
> in1.coGroup(in2).where(...).equalTo(...).with(new
> OuterJoinCoGroupFunction).map(new OuterJoinMapFunction(MyJoinFunction));
> OJCoGroupFunction and OJMapFunction are functions that you need to
> implement.
> OJCoGroupFunction does what Stephan said (it builds pairs of matching
> elements) and returns a Tuple2<TypeX, TypeY>.
> OJMapFunction unpacks the Tuple2<TypeX, TypeY> and calls the user's Join
> function (MyJoinFunction).
> There are a few operators implemented this way. For example have a look at
> the Reduce/GroupReduce with KeySelectors which are translated into Map ->
> Reduce  (or Map -> GroupReduce).
> Let us know, if you have any questions!
> Cheers, Fabian
> 2014-12-13 16:25 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>> Hi Wilson!
>> You can start by mocking an outer join operator using a special CoGroup
>> function. If one of the two sides for a group is empty, you have the case
>> where you need to append null values. Otherwise, you build the Cartesian
>> produce within the group.
>> For a proper through-the-stack implementation (not sure if that is needed,
>> but may be nice to have), have a look here:
>> http://flink.incubator.apache.org/docs/0.7-incubating/internal_add_operator.html
>> Greetings,
>> Stephan
>> On Sat, Dec 13, 2014 at 3:19 AM, Zihong Cao <wilsoncao01@gmail.com> wrote:
>>> Hi,
>>> I am trying to pick up the outer join operator. However, as Fabian
>>> mentioned to me, that this task would require to touch many different
>>> components of the system, it would be a challenge job for me. Therefore I
>>> would need some help:-)
>>> I might need to walk through some features like Compiler/Optimizer and
>>> Runtime(as Fabian mentioned to me), so where should I start to get
>> familiar?
>>> One more thing, is the outer join operator implementation similar to the
>>> pure join operator?
>>> Best,
>>> Wilson Cao

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message