flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From schul...@informatik.hu-berlin.de
Subject Re: Scala Code Generation
Date Sun, 18 Oct 2015 12:22:56 GMT
I was able to reproduce the error with some more queries by now. However
it seems like it is only a problem for Flink's local mode. During cluster
execution everything works just fine.

Regards, Max


> Thanks a lot for the help.
>
> I was able to apply the Tuple1 functionality to fix my problem. I also
> moved up to Flink 0.9.
>
> However I have another problem executing generated Scala programs. It
> seems like a Scala program executed with a Flink 0.9 Job Manager only has
> a limited amount of usable operators. I use the Flink quickstart package
> to generate executable .jar files (using mvn clean package). The following
> is a simple example program generated by my compiler from a rewritten AQL
> query of TPCH query Q6. Whenever I pack it into a .jar file and try to
> execute it using a local job manager, I get a "Class not found"-error,
> however when I remove any of the operators it works just fine. I also ran
> the example within eclipse using the old Flink 0.8 quickstart package.
> Interestingly it worked fine there, too, no matter how many operators I
> used. Does the Scala environment in Flink 0.9 indeed only have a limited
> amount of usable operators? Is this a configuration issue and it is
> possible to increase that number?
>
> This is the Query I ran:
>
> import org.apache.flink.api.scala._
> import org.apache.flink.api.java.aggregation
>
> object Job {
>   def main(args: Array[String]) {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>
>     val $l =
> env.readCsvFile[(Int,Int,Int,Int,Double,Double,Double,Double,String,String,String,String,String,String,String,String)]("/home/mcs1408/TPCH_data/lineitem.tbl",
> "\n", "|")
>     val val0 = $l.filter( x => x._11 >= "1994-01-01")
>     val val1 = val0.filter( x => x._11 < "1995-01-01")
>     val val2 = val1.filter( x => x._7 >= -0.01)
>     val val3 = val2.filter( x => x._7 < 0.01)
>     val val4 = val3.filter( x => x._5 < 24)
>     val val5 = val4.map{ x => (x._1, x._2, x._3, x._4, x._5, x._8, x._9,
> x._10, x._11, x._12, x._13, x._14, x._15, x._16, x._6 * (1 - x._7)) }
>         .sum(14)
>     val val6 = val5.map{ x => Tuple1(x._15) }
>         .writeAsCsv("/home/mcs1408/TPCH_data/result", "\n", "|")
>
>     env.execute("Flink Scala API parsed AQL Query")
>   }
> }
>
> Thanks a lot for any help!
> Best regards,
> Max Schultze
>
>
>
>> If you're using Scala, then you're bound to a maximum of 22 fields in a
>> tuple, because the Scala library does not provide larger tuples. You
>> could
>> generate your own case classes which have more than the 22 fields,
>> though.
>> On Oct 14, 2015 11:30 AM, "Ufuk Celebi" <uce@apache.org> wrote:
>>
>>>
>>> > On 13 Oct 2015, at 16:06, schultze@informatik.hu-berlin.de wrote:
>>> >
>>> > Hello,
>>> >
>>> > I am currently working on a compilation unit translating AsterixDB's
>>> AQL
>>> > into runnable Scala code for Flink's Scala API. During code
>>> generation
>>> I
>>> > discovered some things that are quite hard to work around. I am still
>>> > working with Flink version 0.8, so some of the problems I have might
>>> > already be fixed in 0.9 and if so please tell me.
>>> >
>>> > First, whenever a record gets projected down to only a single field
>>> (e.g.
>>> > by a map or reduce function) it is no longer considered a record, but
>>> a
>>> > variable of the type of that field. If afterwards I want to apply
>>> > additional functions like .sum(0) I get an error message like
>>>
>>> A workaround is to return Tuple1<X> for this. Then you can run the
>>> aggregation. I think that the Tuple0 class has been added after 0.8
>>> though.
>>>
>>> > "Aggregating on field positions is only possible on tuple data
>>> types."
>>> >
>>> > This is the same for all functions (like write or join) as the
>>> "record"
>>> is
>>> > no longer considered a dataset.
>>>
>>> What do you mean? At least in the current versions, the join
>>> projections
>>> return a Tuple type as well.
>>>
>>> > Second, I found that records longer than 22 fields are not supported.
>>> > Whenever I have a record that is longer than that I receive a build
>>> error
>>> > as
>>>
>>> Flink’s Tuple classes go up to Tuple25. You can work around this by
>>> using
>>> a custom PoJo type, e.g.
>>>
>>> class TPCHRecord {
>>>     public int f0;
>>>     ...
>>>     public int f99;
>>> }
>>>
>>> If possible, I would suggest to update to the latest 0.9 or the
>>> upcoming
>>> 0.10 release. A lot of stuff has been fixed since 0.8. I think it will
>>> be
>>> worth it. If you encounter any problems while doing this, feel free to
>>> ask
>>> here. :)
>>>
>>> – Ufuk
>>
>
>
>



Mime
View raw message