Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E4817180B3 for ; Sun, 18 Oct 2015 12:23:14 +0000 (UTC) Received: (qmail 95204 invoked by uid 500); 18 Oct 2015 12:23:14 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 95127 invoked by uid 500); 18 Oct 2015 12:23:14 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 95117 invoked by uid 99); 18 Oct 2015 12:23:14 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Oct 2015 12:23:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 48A04180A75 for ; Sun, 18 Oct 2015 12:23:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.01 X-Spam-Level: X-Spam-Status: No, score=-0.01 tagged_above=-999 required=6.31 tests=[SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id FEu68SKcGjtT for ; Sun, 18 Oct 2015 12:23:05 +0000 (UTC) Received: from mailout1.informatik.hu-berlin.de (mailout1.informatik.hu-berlin.de [141.20.20.101]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 49F42439C4 for ; Sun, 18 Oct 2015 12:23:05 +0000 (UTC) Received: from mailbox.informatik.hu-berlin.de (mailbox [141.20.20.63]) by mail.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-25) with ESMTP id t9ICMuGK026569 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-GCM-SHA384 bits=256 verify=OK) for ; Sun, 18 Oct 2015 14:22:57 +0200 (MEST) Received: from www2.informatik.hu-berlin.de (www2.informatik.hu-berlin.de [141.20.20.55]) by mailbox.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-25) with ESMTP id t9ICMtMw026563 for ; Sun, 18 Oct 2015 14:22:56 +0200 (MEST) Received: from 91.55.86.183 (SquirrelMail authenticated user schultze) by www2.informatik.hu-berlin.de with HTTP; Sun, 18 Oct 2015 14:22:56 +0200 (CEST) Message-ID: <59990.91.55.86.183.1445170976.squirrel@www2.informatik.hu-berlin.de> In-Reply-To: <49766.79.245.166.200.1445013506.squirrel@www2.informatik.hu-berlin.de > References: <55482.79.245.160.131.1444745189.squirrel@www2.informatik.hu-berlin.de> <49766.79.245.166.200.1445013506.squirrel@www2.informatik.hu-berlin.de> Date: Sun, 18 Oct 2015 14:22:56 +0200 (CEST) Subject: Re: Scala Code Generation From: schultze@informatik.hu-berlin.de To: user@flink.apache.org User-Agent: SquirrelMail/1.4.9a MIME-Version: 1.0 Content-Type: text/plain;charset=iso-8859-1 Content-Transfer-Encoding: 8bit X-Priority: 3 (Normal) Importance: Normal X-Virus-Scanned: clamav-milter 0.98.4 at mailbox X-Virus-Status: Clean X-Greylist: Sender succeeded STARTTLS authentication, not delayed by milter-greylist-4.5.1 (mail.informatik.hu-berlin.de [141.20.20.50]); Sun, 18 Oct 2015 14:22:57 +0200 (MEST) I was able to reproduce the error with some more queries by now. However it seems like it is only a problem for Flink's local mode. During cluster execution everything works just fine. Regards, Max > Thanks a lot for the help. > > I was able to apply the Tuple1 functionality to fix my problem. I also > moved up to Flink 0.9. > > However I have another problem executing generated Scala programs. It > seems like a Scala program executed with a Flink 0.9 Job Manager only has > a limited amount of usable operators. I use the Flink quickstart package > to generate executable .jar files (using mvn clean package). The following > is a simple example program generated by my compiler from a rewritten AQL > query of TPCH query Q6. Whenever I pack it into a .jar file and try to > execute it using a local job manager, I get a "Class not found"-error, > however when I remove any of the operators it works just fine. I also ran > the example within eclipse using the old Flink 0.8 quickstart package. > Interestingly it worked fine there, too, no matter how many operators I > used. Does the Scala environment in Flink 0.9 indeed only have a limited > amount of usable operators? Is this a configuration issue and it is > possible to increase that number? > > This is the Query I ran: > > import org.apache.flink.api.scala._ > import org.apache.flink.api.java.aggregation > > object Job { > def main(args: Array[String]) { > val env = ExecutionEnvironment.getExecutionEnvironment > > val $l = > env.readCsvFile[(Int,Int,Int,Int,Double,Double,Double,Double,String,String,String,String,String,String,String,String)]("/home/mcs1408/TPCH_data/lineitem.tbl", > "\n", "|") > val val0 = $l.filter( x => x._11 >= "1994-01-01") > val val1 = val0.filter( x => x._11 < "1995-01-01") > val val2 = val1.filter( x => x._7 >= -0.01) > val val3 = val2.filter( x => x._7 < 0.01) > val val4 = val3.filter( x => x._5 < 24) > val val5 = val4.map{ x => (x._1, x._2, x._3, x._4, x._5, x._8, x._9, > x._10, x._11, x._12, x._13, x._14, x._15, x._16, x._6 * (1 - x._7)) } > .sum(14) > val val6 = val5.map{ x => Tuple1(x._15) } > .writeAsCsv("/home/mcs1408/TPCH_data/result", "\n", "|") > > env.execute("Flink Scala API parsed AQL Query") > } > } > > Thanks a lot for any help! > Best regards, > Max Schultze > > > >> If you're using Scala, then you're bound to a maximum of 22 fields in a >> tuple, because the Scala library does not provide larger tuples. You >> could >> generate your own case classes which have more than the 22 fields, >> though. >> On Oct 14, 2015 11:30 AM, "Ufuk Celebi" wrote: >> >>> >>> > On 13 Oct 2015, at 16:06, schultze@informatik.hu-berlin.de wrote: >>> > >>> > Hello, >>> > >>> > I am currently working on a compilation unit translating AsterixDB's >>> AQL >>> > into runnable Scala code for Flink's Scala API. During code >>> generation >>> I >>> > discovered some things that are quite hard to work around. I am still >>> > working with Flink version 0.8, so some of the problems I have might >>> > already be fixed in 0.9 and if so please tell me. >>> > >>> > First, whenever a record gets projected down to only a single field >>> (e.g. >>> > by a map or reduce function) it is no longer considered a record, but >>> a >>> > variable of the type of that field. If afterwards I want to apply >>> > additional functions like .sum(0) I get an error message like >>> >>> A workaround is to return Tuple1 for this. Then you can run the >>> aggregation. I think that the Tuple0 class has been added after 0.8 >>> though. >>> >>> > "Aggregating on field positions is only possible on tuple data >>> types." >>> > >>> > This is the same for all functions (like write or join) as the >>> "record" >>> is >>> > no longer considered a dataset. >>> >>> What do you mean? At least in the current versions, the join >>> projections >>> return a Tuple type as well. >>> >>> > Second, I found that records longer than 22 fields are not supported. >>> > Whenever I have a record that is longer than that I receive a build >>> error >>> > as >>> >>> Flink’s Tuple classes go up to Tuple25. You can work around this by >>> using >>> a custom PoJo type, e.g. >>> >>> class TPCHRecord { >>> public int f0; >>> ... >>> public int f99; >>> } >>> >>> If possible, I would suggest to update to the latest 0.9 or the >>> upcoming >>> 0.10 release. A lot of stuff has been fixed since 0.8. I think it will >>> be >>> worth it. If you encounter any problems while doing this, feel free to >>> ask >>> here. :) >>> >>> – Ufuk >> > > >