flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Lazy Evaluation
Date Tue, 21 Jun 2016 16:43:36 GMT
Great to hear that it works now! :-)

On Sun, 19 Jun 2016 at 16:33 Paschek, Robert <robert.paschek@tu-berlin.de>
wrote:

> Hi Mailing List,
>
> after "upgrading" the flink version in my pom.xml to 1.0.3, i get two
> error messages for these output variants, which don't work:
>
> org.apache.flink.api.common.functions.InvalidTypesException: The return
> type of function 'main(MR_GPSRS.java:69)' could not be determined
> automatically, due to type erasure. You can give type information hints by
> using the returns(...) method on the result of the transformation call, or
> by letting your function implement the 'ResultTypeQueryable' interface.
>
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> Type of TypeVariable 'OUT' in 'class
> org.apache.flink.api.common.functions.RichGroupReduceFunction' could not be
> determined. This is most likely a type erasure problem. The type extraction
> currently supports types with generic variables only in cases where all
> variables in the return type can be deduced from the input type(s).
>
>
> After adding adding ".returns(input.getType())" to my transformation,
> everything works great now : - )
> Many thanks to these developers, who added this messages in the last
> versions!
>
> Best,
> Robert
> ________________________________________
> Von: Paschek, Robert <robert.paschek@tu-berlin.de>
> Gesendet: Dienstag, 14. Juni 2016 19:52
> An: user@flink.apache.org
> Betreff: Lazy Evaluation
>
> Hi Mailing List,
>
> I probably have a problem with the Lazy Evaluation. Depending of the
> “return” Datatype of my last Transformation (GroupReduce), the integrated
> Flink Mini Clusters does not start. I have done the following:
>
> // Configuration
> Configuration parameters = new Configuration();
> parameters.setString("path",
> "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
> parameters.setString("output", "result_MR_GPSRS.csv");
> parameters.setInteger("dimensionality", 10);
> parameters.setInteger("cardinality", 2000000);
> parameters.setDouble("min", 0.0);
> parameters.setDouble("max", 200.0);
>
> // Setting Up Execution environment
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> // Reading CSV
> DataSet<?> input =
> InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality", 2),
> env, parameters.getString("path", ""));
>
>
> //Broadcast BitString, Cardinality, Dimensionality, PPD
> @SuppressWarnings({ "unchecked", "rawtypes" })
> DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData =
>                                 input
>                                 .mapPartition(new
> MR_GP_BitStringGeneratorMapper())
>                                 .reduceGroup(new
> MR_GP_BitStringGeneratorReducer());
>
> // Calculate result
> @SuppressWarnings({ "unchecked", "rawtypes" })
> DataSet<?> result = input
>                                 .mapPartition(new
> MR_GPSRS_Mapper()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters)
>                                 .reduceGroup(new
> MR_GPSRS_Reducer()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters);
>
>
> try {
>         result.writeAsCsv(parameters.getString("output", ""),
> FileSystem.WriteMode.OVERWRITE);
>         JobExecutionResult job = env.execute();
>         System.out.println("Runtime in seconds:
> "+(job.getNetRuntime()/1000));
>         } catch (Exception e) {
>         // TODO Auto-generated catch block
> }
>
>
>
> When I run my program with the integrated Flink mini cluster in eclipse,
> the console outputs only the following:
> 19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>        - class java.util.BitSet is not a valid POJO type
> 19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>        - class java.util.BitSet is not a valid POJO type
> 19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>        - class java.util.ArrayList is not a valid POJO type
> 19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>        - class java.util.ArrayList is not a valid POJO type
>
> The MR_GPSRS_Reducer looks like the following:
> public class MR_GPSRS_Reducer <T extends Tuple> extends
> RichGroupReduceFunction<ArrayList<ArrayList<T>>, T>
> [...]
> @Override
> public void reduce(Iterable<ArrayList<ArrayList<T>>>
> localSkylinesPerPartition, Collector<T> out) throws Exception {
> [...]
> for (T tuple : tuples) {
>     out.collect(tuple);
> }
>
> If i change my code of MR_GPSRS_Reducer to fit the following, it has still
> the same behavior:
> public class MR_GPSRS_Reducer <T extends Tuple> extends
> RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<T>>{
> [...]
> @Override
> public void reduce(Iterable<ArrayList<ArrayList<T>>>
> localSkylinesPerPartition, Collector<Tuple1<T>> out) throws Exception {
> [...]
> for (T tuple : tuples) {
>     out.collect(new Tuple1<T>(tuple));
> }
>
> Same as here:
> public class MR_GPSRS_Reducer <T extends Tuple> extends
> RichGroupReduceFunction<ArrayList<ArrayList<T>>, ArrayList<T>>{
> [...]
> public void reduce(Iterable<ArrayList<ArrayList<T>>>
> localSkylinesPerPartition, Collector<ArrayList<T>> out) throws Exception
{
> [...]
> out.collect(tuples);
>
> Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini
> Cluster starts:
> public class MR_GPSRS_Reducer <T extends Tuple> extends
> RichGroupReduceFunction<ArrayList<ArrayList<T>>, Tuple1<ArrayList<T>>>{
> [...]
> public void reduce(Iterable<ArrayList<ArrayList<T>>>
> localSkylinesPerPartition, Collector<Tuple1<ArrayList<T>>> out) throws
> Exception {
> [...]
> out.collect(new Tuple1<ArrayList<T>>(tuples));
> (The hint to the not valid PoJo types still remains)
>
> But that isn't my preferred format for the DataSink...
> If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink
> Minicluster also starts.
>
>
> Has anybody an idea, how can I teach Flink to execute my program with T's
> as DataSink? (In that case, T would be a Tuple10 with Doubles).
> (I have already tried to explicitly typecast the datasets and
> transformations, so that the suppression of the warnings isn't necessary
> any more)
>
> I'm using <flink.version>1.0.1</flink.version>
>
> Thank you in advance
> Robert
>
>
>

Mime
View raw message