flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Paschek, Robert" <robert.pasc...@tu-berlin.de>
Subject AW: Lazy Evaluation
Date Sun, 19 Jun 2016 14:33:28 GMT
Hi Mailing List,

after "upgrading" the flink version in my pom.xml to 1.0.3, i get two error messages for these
output variants, which don't work:

org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(MR_GPSRS.java:69)'
could not be determined automatically, due to type erasure. You can give type information
hints by using the returns(...) method on the result of the transformation call, or by letting
your function implement the 'ResultTypeQueryable' interface.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable
'OUT' in 'class org.apache.flink.api.common.functions.RichGroupReduceFunction' could not be
determined. This is most likely a type erasure problem. The type extraction currently supports
types with generic variables only in cases where all variables in the return type can be deduced
from the input type(s).


After adding adding ".returns(input.getType())" to my transformation, everything works great
now : - )
Many thanks to these developers, who added this messages in the last versions!

Best,
Robert
________________________________________
Von: Paschek, Robert <robert.paschek@tu-berlin.de>
Gesendet: Dienstag, 14. Juni 2016 19:52
An: user@flink.apache.org
Betreff: Lazy Evaluation

Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype
of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start.
I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 2000000);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Reading CSV
DataSet<?> input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality",
2), env, parameters.getString("path", ""));


//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData =
                                input
                                .mapPartition(new MR_GP_BitStringGeneratorMapper())
                                .reduceGroup(new MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<?> result = input
                                .mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData,
"MetaData").withParameters(parameters)
                                .reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData,
"MetaData").withParameters(parameters);


try {
        result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult job = env.execute();
        System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
        } catch (Exception e) {
        // TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the console outputs
only the following:
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet
is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet
is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList
is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList
is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>,
T>
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition,
Collector<T> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>,
Tuple1<T>>{
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition,
Collector<Tuple1<T>> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(new Tuple1<T>(tuple));
}

Same as here:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>,
ArrayList<T>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition,
Collector<ArrayList<T>> out) throws Exception {
[...]
out.collect(tuples);

Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>,
Tuple1<ArrayList<T>>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition,
Collector<Tuple1<ArrayList<T>>> out) throws Exception {
[...]
out.collect(new Tuple1<ArrayList<T>>(tuples));
(The hint to the not valid PoJo types still remains)

But that isn't my preferred format for the DataSink...
If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also
starts.


Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In
that case, T would be a Tuple10 with Doubles).
(I have already tried to explicitly typecast the datasets and transformations, so that the
suppression of the warnings isn't necessary any more)

I'm using <flink.version>1.0.1</flink.version>

Thank you in advance
Robert



Mime
View raw message