flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonidas Fegaras <fega...@cse.uta.edu>
Subject Re: Problem with groupBy over custom types
Date Thu, 31 Jul 2014 14:54:29 GMT
Hi Stephan,
Thank you for fixing this so fast (given that you are very busy 
preparing your first release).
Maybe I should explain why I need to work on GenericTypes.
I am trying to make Apache MRQL run on Flink. MRQL is a query processing 
and optimization system for large-scale, distributed data analysis, 
built on top of Apache Hadoop, Hama, and Spark. MRQL queries are 
SQL-like but not SQL. They can work on complex nested data (JSON, XML, 
etc) and can express complex queries (pagerank, matrix factorization, 
etc). Let me make this clear first: Flink doesn't really need a query 
language. Flink programs are like queries because operations are 
collected and optimized. This gives Flink an edge over Spark. The reason 
I want to port MRQL to Flink is for the benefit of our project only: we 
want our queries to run on multiple platforms so that users can play and 
experiment with these systems without having to learn their APIs and 
without changing the queries. So I am not interested in the Flink 
optimizations (which is a shame, I know) since our system has it's own 
optimizer (which is currently not cost-based). So, to make the story 
short, the MRQL data model is like AVRO since it must support complex 
types. So the getKey methods must map an AVRO-like object to another 
AVRO-like object (the key). It doesn't mean that the key is the same as 
the value. It's fully understandable (and expected) that I will not be 
able to use the benefits of the Flink optimizer much on GenericTypes. 
Anyway, I am in the process of learning Flink and I will probably bother 
you with more questions later (but I will wait for the first Flink 
release first, since this will keep you busy for a while).
Thanks for your help

On 07/30/2014 08:09 PM, Stephan Ewen wrote:
> Addendum: I made a quick fix for the POJOs to support the expression 
> keys for reducers. The example from the above mail works with the code 
> in this branch: 
> https://github.com/StephanEwen/incubator-flink/commits/pojofix
> On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <sewen@apache.org 
> <mailto:sewen@apache.org>> wrote:
>     Yep, there is a flaw in the POJO Code, it incorrectly replaces the
>     GenericTypes:
>     @Leonidas: To explain what is going on:
>       - Flink determines the types of the functions via reflection and
>     build up its own model of representing them at runtime. It handles
>     basic types (int, String, double, ...), arrays, and tuples in a
>     special way, the rest is treated in a generic fashion. The FData
>     class is such a "generic" type in flink.
>       - We recently added experimental code to analyze those types and
>     represent their contained fields in a transparent way. Those are
>     the POJO types, the code at the bottom illustrates what they can do.
>       They replace the "generic types" at several levels, but their
>     implementation is incomplete currently
>     A few remarks on how to best use the system:
>       - Flink keeps data always in a serialized form. That allows it
>     to operate very robust with respect to memory pressure, spilling, etc.
>       - A way in which we gain efficiency is to access in that binary
>     data only what is really necessary (only the parts that make up
>     the key, for example).
>       - If you define the entire object to be the key, you prevent the
>     system from doing these optimizations. If you can actually define
>     which part is the key, you allow it more efficient operation.
>     ===================================
>     Example of PoJos and Expression Fields
>     public static void main(String[] args) throws Exception {
>     ExecutionEnvironment env =
>     ExecutionEnvironment.getExecutionEnvironment();
>     DataSet<FData> data = env.fromElements(new FData("some"), new
>     FData("test"), new FData("POJOs"));
>     data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();
>     env.execute();
>     }
>     public static class FData implements Serializable, Comparable<FData> {
>     public String theString;
>     public FData () {
>     theString = "";
>     }
>     public FData (String data) {
>     this.theString = data;
>     }
>     @Override
>     public int compareTo ( FData x ) { return
>     theString.compareTo(x.theString); }
>     @Override
>     public String toString() {
>     return theString;
>     }
>     }
>     public static final class GroupbyReducer extends
>     GroupReduceFunction<FData,FData> {
>     @Override
>     public void reduce ( Iterator<FData> values, Collector<FData> out ) {
>     while (values.hasNext()) {
>     out.collect(values.next());
>     }
>     }
>     }

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message