flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonidas Fegaras <fega...@cse.uta.edu>
Subject Problem with groupBy over custom types
Date Wed, 30 Jul 2014 21:14:49 GMT
Hi,
I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):

public class FData implements Serializable, Comparable<FData> {
     public ... data;
     public FData () { ... }
     @Override
     public int compareTo ( FData x ) { return data.compareTo(x.data); }
...
}

Methods map and flatMap work fine on DataSet<FData>. But I have a 
problem with the following groupBy code:

s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());

where s is a DataSet<FData> and the classes are defined as follows:

public static final class GroupbyKey extends KeySelector<FData,FData> {
    @Override
    public FData getKey ( FData value ) { return value; }
}
public static final class GroupbyReducer extends 
GroupReduceFunction<FData,FData> {
    @Override
    public void reduce ( final Iterator<FData> values, Collector<FData> 
out ) {}
}

This gives me the following error:

org.apache.flink.compiler.CompilerException: Error translating node 
'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ 
GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties 
[ordering=null, grouped=null, unique=null] ]]': Could not serialize 
comparator into the configuration.
     at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
     at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
     at 
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
     at 
org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
     at 
org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
     at 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
     at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
...

(I tried to make the example as simple as possible).
What is the problem here? Do I need to implement FData with a different 
interface?
Thanks
Leonidas Fegaras


Mime
View raw message