Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 10BA01112D for ; Wed, 30 Jul 2014 22:48:45 +0000 (UTC) Received: (qmail 69915 invoked by uid 500); 30 Jul 2014 22:48:45 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 69860 invoked by uid 500); 30 Jul 2014 22:48:44 -0000 Mailing-List: contact dev-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list dev@flink.incubator.apache.org Received: (qmail 69848 invoked by uid 99); 30 Jul 2014 22:48:44 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Jul 2014 22:48:44 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of ewenstephan@gmail.com designates 209.85.213.182 as permitted sender) Received: from [209.85.213.182] (HELO mail-ig0-f182.google.com) (209.85.213.182) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Jul 2014 22:48:42 +0000 Received: by mail-ig0-f182.google.com with SMTP id c1so3829663igq.3 for ; Wed, 30 Jul 2014 15:48:17 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=84az+F0wpYm/Tr4D4QuSSFKEUyToXKC5aPABPo2zY+I=; b=pyANFou/V+SwI2fyr6n4Lvd/WydXUAMaCog0ECD3HG8U2y3r4rHSMEEIDmQXrpAwkG p9cXwSkjDcyIgW1m0IB6LQS2cUKgnNDMnpKEeGm34E2fLyIiZnLLht5sAU+1T/+oyEjf jq/7mhC5cvavZeDmQEu7S4EUZKkTy9L+t5KxaNE9TtR2ZnjaO9G/7vFwM2IDKAtGzXAE og2mWB/bvj8mboWLQFQ/YCTmzrHaUxRbL9YZjT/TLj0MnmJPD62nKyqIYsg4goOvTtoy MlD4ddgmKTuj2Py9UK3Svg7+NeB+FrkPP8tcoVmk10SUplxcVYOJeIpYhGS1o39SyCB3 goEw== MIME-Version: 1.0 X-Received: by 10.42.83.131 with SMTP id h3mr9719177icl.77.1406760497808; Wed, 30 Jul 2014 15:48:17 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.64.111.6 with HTTP; Wed, 30 Jul 2014 15:48:17 -0700 (PDT) In-Reply-To: <53D9662A.3070001@cse.uta.edu> References: <53D96049.1060509@cse.uta.edu> <9CD0556A-D169-40EC-9C72-85B8D7B278DE@fu-berlin.de> <53D9662A.3070001@cse.uta.edu> Date: Thu, 31 Jul 2014 00:48:17 +0200 X-Google-Sender-Auth: tWqxKtOZA3ACcq1rscaR5L4fymI Message-ID: Subject: Re: Problem with groupBy over custom types From: Stephan Ewen To: dev@flink.incubator.apache.org, fegaras@cse.uta.edu Content-Type: multipart/alternative; boundary=90e6ba614fd4c1890f04ff70f380 X-Virus-Checked: Checked by ClamAV on apache.org --90e6ba614fd4c1890f04ff70f380 Content-Type: text/plain; charset=UTF-8 Hi Leonidas! What you are doing should actually be supported. Do you have more of the stack-trace? It seems that there is some non-serializable part somewhere in the GenericTypeComparator.. Stephan On Wed, Jul 30, 2014 at 11:39 PM, Leonidas Fegaras wrote: > Hi Ufuk, > Your getKey returns a String, so it's very simple. Mine must return a > custom type (FData). So my getKey gets an FData and returns a different > FData. I just made it identical to show you the error. > So my question now is this: can getKey return a Comparable custom type or > it must always be a simple type, such as String? > Thanks > Leonidas > PS. Should your WC class be Serializable? > > > > On 07/30/2014 04:26 PM, Ufuk Celebi wrote: > >> Hey Leonidas, >> >> I think the problem is with the KeySelector. The key selector should >> specify which field of your custom type should be used to do the grouping, >> but you are currently just returning the same object. >> >> So you would have to think about which fields define the separate groups. >> For example with a custom type for word counts, where you want to group on >> distinct words: >> >> public class WC { >> public String word; >> public int count; >> // [...] >> } >> >> input.groupBy(new KeySelector() { >> public String getKey(WC wc) { >> return wc.word; >> } >> }).reduce(...); >> >> Does this help? Feel free to get back if you have further questions! :-) >> >> Ufuk >> >> On 30 Jul 2014, at 23:14, Leonidas Fegaras wrote: >> >> Hi, >>> I am trying to do a groupBy over a DataSet with a custom type (not a >>> Tuple): >>> >>> public class FData implements Serializable, Comparable { >>> public ... data; >>> public FData () { ... } >>> @Override >>> public int compareTo ( FData x ) { return data.compareTo(x.data); } >>> ... >>> } >>> >>> Methods map and flatMap work fine on DataSet. But I have a >>> problem with the following groupBy code: >>> >>> s.groupBy(new GroupbyKey()).reduceGroup(new GroupbyReducer()); >>> >>> where s is a DataSet and the classes are defined as follows: >>> >>> public static final class GroupbyKey extends KeySelector { >>> @Override >>> public FData getKey ( FData value ) { return value; } >>> } >>> public static final class GroupbyReducer extends >>> GroupReduceFunction { >>> @Override >>> public void reduce ( final Iterator values, Collector >>> out ) {} >>> } >>> >>> This gives me the following error: >>> >>> org.apache.flink.compiler.CompilerException: Error translating node >>> 'GroupReduce "x.FlinkEvaluator$GroupbyReducer" : SORTED_GROUP_REDUCE [[ >>> GlobalProperties [partitioning=RANDOM] ]] [[ LocalProperties >>> [ordering=null, grouped=null, unique=null] ]]': Could not serialize >>> comparator into the configuration. >>> at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator. >>> preVisit(NepheleJobGraphGenerator.java:346) >>> at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator. >>> preVisit(NepheleJobGraphGenerator.java:100) >>> at org.apache.flink.compiler.plan.SingleInputPlanNode. >>> accept(SingleInputPlanNode.java:145) >>> at org.apache.flink.compiler.plan.SingleInputPlanNode. >>> accept(SingleInputPlanNode.java:146) >>> at org.apache.flink.compiler.plan.OptimizedPlan.accept( >>> OptimizedPlan.java:165) >>> at org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator. >>> compileJobGraph(NepheleJobGraphGenerator.java:170) >>> at org.apache.flink.client.program.Client.getJobGraph( >>> Client.java:214) >>> ... >>> >>> (I tried to make the example as simple as possible). >>> What is the problem here? Do I need to implement FData with a different >>> interface? >>> Thanks >>> Leonidas Fegaras >>> >> . >> >> > --90e6ba614fd4c1890f04ff70f380--