flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Paschek, Robert" <robert.pasc...@tu-berlin.de>
Subject Lazy Evaluation
Date Tue, 14 Jun 2016 17:52:01 GMT
Hi Mailing List,

I probably have a problem with the Lazy Evaluation. Depending of the “return” Datatype
of my last Transformation (GroupReduce), the integrated Flink Mini Clusters does not start.
I have done the following:

// Configuration
Configuration parameters = new Configuration();
parameters.setString("path", "generated_2000000_tuples_10_dimensions_100.0_mean_25.0_std_and_498762467_seed.csv");
parameters.setString("output", "result_MR_GPSRS.csv");
parameters.setInteger("dimensionality", 10);
parameters.setInteger("cardinality", 2000000);
parameters.setDouble("min", 0.0);
parameters.setDouble("max", 200.0);

// Setting Up Execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();		

// Reading CSV
DataSet<?> input = InputConfig.setDataSetFromCSV(parameters.getInteger("dimensionality",
2), env, parameters.getString("path", ""));
		

//Broadcast BitString, Cardinality, Dimensionality, PPD
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<Tuple4<BitSet,Integer,Integer,Integer>> metaData =
				input
				.mapPartition(new MR_GP_BitStringGeneratorMapper())
				.reduceGroup(new MR_GP_BitStringGeneratorReducer());

// Calculate result
@SuppressWarnings({ "unchecked", "rawtypes" })
DataSet<?> result = input
				.mapPartition(new MR_GPSRS_Mapper()).withBroadcastSet(metaData, "MetaData").withParameters(parameters)
				.reduceGroup(new MR_GPSRS_Reducer()).withBroadcastSet(metaData, "MetaData").withParameters(parameters);
		

try {
	result.writeAsCsv(parameters.getString("output", ""), FileSystem.WriteMode.OVERWRITE);
	JobExecutionResult job = env.execute();
	System.out.println("Runtime in seconds: "+(job.getNetRuntime()/1000));
	} catch (Exception e) {
	// TODO Auto-generated catch block
}



When I run my program with the integrated Flink mini cluster in eclipse, the console outputs
only the following: 
19:00:12,978 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet
is not a valid POJO type
19:00:13,010 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.BitSet
is not a valid POJO type
19:00:13,017 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList
is not a valid POJO type
19:00:13,021 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class java.util.ArrayList
is not a valid POJO type

The MR_GPSRS_Reducer looks like the following:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>,
T>
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition,
Collector<T> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(tuple);
}

If i change my code of MR_GPSRS_Reducer to fit the following, it has still the same behavior:

public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>,
Tuple1<T>>{
[...]
@Override
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition,
Collector<Tuple1<T>> out) throws Exception {
[...]
for (T tuple : tuples) {
    out.collect(new Tuple1<T>(tuple));
}

Same as here:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>,
ArrayList<T>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition,
Collector<ArrayList<T>> out) throws Exception {
[...]
out.collect(tuples);

Only if i change the MR_GPSRS_Reducer to the following, the Flink Mini Cluster starts:
public class MR_GPSRS_Reducer <T extends Tuple> extends RichGroupReduceFunction<ArrayList<ArrayList<T>>,
Tuple1<ArrayList<T>>>{
[...]
public void reduce(Iterable<ArrayList<ArrayList<T>>> localSkylinesPerPartition,
Collector<Tuple1<ArrayList<T>>> out) throws Exception {
[...]
out.collect(new Tuple1<ArrayList<T>>(tuples));
(The hint to the not valid PoJo types still remains)

But that isn't my preferred format for the DataSink...
If I uncomment the MR_GPSRS_Reducer (the last transformation), the Flink Minicluster also
starts.


Has anybody an idea, how can I teach Flink to execute my program with T's as DataSink? (In
that case, T would be a Tuple10 with Doubles).
(I have already tried to explicitly typecast the datasets and transformations, so that the
suppression of the warnings isn't necessary any more)

I'm using <flink.version>1.0.1</flink.version>

Thank you in advance
Robert



Mime
View raw message