flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Problem with groupBy over custom types
Date Wed, 30 Jul 2014 22:48:17 GMT
Hi Leonidas!

What you are doing should actually be supported. Do you have more of the
stack-trace?

It seems that there is some non-serializable part somewhere in the
GenericTypeComparator..

Stephan



On Wed, Jul 30, 2014 at 11:39 PM, Leonidas Fegaras <fegaras@cse.uta.edu>
wrote:

> Hi Ufuk,
> Your getKey returns a String, so it's very simple. Mine must return a
> custom type (FData). So my getKey gets an FData and returns a different
> FData. I just made it identical to show you the error.
> So my question now is this: can getKey return a Comparable custom type or
> it must always be a simple type, such as String?
> Thanks
> Leonidas
> PS. Should your WC class be Serializable?
>
>
>
> On 07/30/2014 04:26 PM, Ufuk Celebi wrote:
>
>> Hey Leonidas,
>>
>> I think the problem is with the KeySelector. The key selector should
>> specify which field of your custom type should be used to do the grouping,
>> but you are currently just returning the same object.
>>
>> So you would have to think about which fields define the separate groups.
>> For example with a custom type for word counts, where you want to group on
>> distinct words:
>>
>> public class WC {
>>      public String word;
>>      public int count;
>>      // [...]
>> }
>>
>> input.groupBy(new KeySelector<WC, String>() {
>>      public String getKey(WC wc) {
>>          return wc.word;
>>      }
>> }).reduce(...);
>>
>> Does this help? Feel free to get back if you have further questions! :-)
>>
>> Ufuk
>>
>> On 30 Jul 2014, at 23:14, Leonidas Fegaras <fegaras@cse.uta.edu> wrote:
>>
>>  Hi,
>>> I am trying to do a groupBy over a DataSet with a custom type (not a
>>> Tuple):
>>>
>>> public class FData implements Serializable, Comparable<FData> {
>>>     public ... data;
>>>     public FData () { ... }
>>>     @Override
>>>     public int compareTo ( FData x ) { return data.compareTo(x.data); }
>>> ...
>>> }
>>>
>>> Methods map and flatMap work fine on DataSet<FData>. But I have a
>>> problem with the following groupBy code:
>>>
>>> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer());
>>>
>>> where s is a DataSet<FData> and the classes are defined as follows:
>>>
>>> public static final class GroupbyKey extends KeySelector<FData,FData> {
>>>    @Override
>>>    public FData getKey ( FData value ) { return value; }
>>> }
>>> public static final class GroupbyReducer extends
>>> GroupReduceFunction<FData,FData> {
>>>    @Override
>>>    public void reduce ( final Iterator<FData> values, Collector<FData>
>>> out ) {}
>>> }
>>>
>>> This gives me the following error:
>>>
>>> org.apache.flink.compiler.CompilerException: Error translating node
>>> 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[
>>> GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties
>>> [ordering=null, grouped=null, unique=null] ]]': Could not serialize
>>> comparator into the configuration.
>>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.
>>> preVisit(NepheleJobGraphGenerator.java:346)
>>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.
>>> preVisit(NepheleJobGraphGenerator.java:100)
>>>     at org.apache.flink.compiler.plan.SingleInputPlanNode.
>>> accept(SingleInputPlanNode.java:145)
>>>     at org.apache.flink.compiler.plan.SingleInputPlanNode.
>>> accept(SingleInputPlanNode.java:146)
>>>     at org.apache.flink.compiler.plan.OptimizedPlan.accept(
>>> OptimizedPlan.java:165)
>>>     at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.
>>> compileJobGraph(NepheleJobGraphGenerator.java:170)
>>>     at org.apache.flink.client.program.Client.getJobGraph(
>>> Client.java:214)
>>> ...
>>>
>>> (I tried to make the example as simple as possible).
>>> What is the problem here? Do I need to implement FData with a different
>>> interface?
>>> Thanks
>>> Leonidas Fegaras
>>>
>> .
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message