flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonidas Fegaras <fega...@cse.uta.edu>
Subject Re: Problem with groupBy over custom types
Date Wed, 30 Jul 2014 21:39:54 GMT
Hi Ufuk,
Your getKey returns a String, so it's very simple. Mine must return a 
custom type (FData). So my getKey gets an FData and returns a different 
FData. I just made it identical to show you the error.
So my question now is this: can getKey return a Comparable custom type 
or it must always be a simple type, such as String?
Thanks
Leonidas
PS. Should your WC class be Serializable?


On 07/30/2014 04:26 PM, Ufuk Celebi wrote:
> Hey Leonidas,
>
> I think the problem is with the KeySelector. The key selector should specify which field
of your custom type should be used to do the grouping, but you are currently just returning
the same object.
>
> So you would have to think about which fields define the separate groups. For example
with a custom type for word counts, where you want to group on distinct words:
>
> public class WC {
>      public String word;
>      public int count;
>      // [...]
> }
>
> input.groupBy(new KeySelector<WC, String>() {
>      public String getKey(WC wc) {
>          return wc.word;
>      }
> }).reduce(...);
>
> Does this help? Feel free to get back if you have further questions! :-)
>
> Ufuk
>
> On 30 Jul 2014, at 23:14, Leonidas Fegaras <fegaras@cse.uta.edu> wrote:
>
>> Hi,
>> I am trying to do a groupBy over a DataSet with a custom type (not a Tuple):
>>
>> public class FData implements Serializable, Comparable<FData> {
>>     public ... data;
>>     public FData () { ... }
>>     @Override
>>     public int compareTo ( FData x ) { return data.compareTo(x.data); }
>> ...
>> }
>>
>> Methods map and flatMap work fine on DataSet<FData>. But I have a problem with
the following groupBy code:
>>
>> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
>>
>> where s is a DataSet<FData> and the classes are defined as follows:
>>
>> public static final class GroupbyKey extends KeySelector<FData,FData> {
>>    @Override
>>    public FData getKey ( FData value ) { return value; }
>> }
>> public static final class GroupbyReducer extends GroupReduceFunction<FData,FData>
{
>>    @Override
>>    public void reduce ( final Iterator<FData> values, Collector<FData>
out ) {}
>> }
>>
>> This gives me the following error:
>>
>> org.apache.flink.compiler.CompilerException: Error translating node 'GroupReduce
"x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ GlobalProperties [partitioning=RANDOM]
]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not serialize
comparator into the configuration.
>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:346)
>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.preVisit(NepheleJobGraphGenerator.java:100)
>>     at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:145)
>>     at org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:146)
>>     at org.apache.flink.compiler.plan.OptimizedPlan.accept(OptimizedPlan.java:165)
>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.compileJobGraph(NepheleJobGraphGenerator.java:170)
>>     at org.apache.flink.client.program.Client.getJobGraph(Client.java:214)
>> ...
>>
>> (I tried to make the example as simple as possible).
>> What is the problem here? Do I need to implement FData with a different interface?
>> Thanks
>> Leonidas Fegaras
> .
>


Mime
View raw message