flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Does Flink allows for encapsulation of transformations?
Date Tue, 07 Jun 2016 10:14:56 GMT
from what i can tell from your code you are trying to execute a job 
within a job. This just doesn't work.

your main method should look like this:

|publicstaticvoidmain(String[]args)throwsException{doublepi =new 
classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|

On 06.06.2016 21:14, Ser Kho wrote:
> The question is how to encapsulate numerous transformations into one 
> object or may be a function in Apache Flink Java setting. I have tried 
> to investigate this question using an example of Pi calculation (see 
> below). I am wondering whether or not the suggested approach is valid 
> from the Flink's point of view. It works on one computer, however, I 
> do not know how it will behave in a cluster setup. The code is given 
> below, and the main idea behind it as follows:
>  1. Create a class, named classPI, which method compute() does all
>     data transformations, see more about it below.
>  2. In the main method create a DataSet as in *DataSet< classPI > opi
>     = env.fromElements(new classPI());*
> 3.
>     Create *DataSet< Double > PI*, which equals output of
>     transformation map() that calls the object PI's method compute() as in
>     *DataSet< Double > PI = opi.map(new MapFunction< classPI ,
>     Double>() { public Double map(classPI objPI) { return
>     objPI.compute(); }});*
> 4.
>     Now about ClassPI
>      *
>         Constructor instantiates ExecutionEnvironment, which is local
>         for this class, as in
>         *public classPI(){ this.NumIter=1000000; env =
>         ExecutionEnvironment.getExecutionEnvironment();}*
> Thus, the code has two ExecutionEnvironment objects: one in main and 
> another in the class classPI.
>  *
>     Has method compute() that runs all data transormations (in this
>     example it is just several lines but potentially it might contain
>     tons of Flink transfromations)
>     *public Double compute(){ DataSet count = env.generateSequence(1,
>     NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
>     4.0*count.collect().get(0)/NumIter;
>     return PI;}*
> the whole code is given below. Again, the question is if this is a 
> valid approach for encapsulation of data transformation into a class 
> in Flink setup that is supposed to be parallelizable to work on a 
> cluster. Is there a better way to hide details of data transformations?
> Thanks a lot!
> -------------------------The code ----------------------
> |publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{// 
> this is one ExecutionEnvironmentfinalExecutionEnvironmentenv 
> =ExecutionEnvironment.getExecutionEnvironment();// this is critical 
> DataSet with my classPI that computes PIDataSet<classPI>opi 
> =env.fromElements(newclassPI());// this map calls the method compute() 
> of class classPI that computes PIDataSet<Double>PI 
> =opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI 
> objPI)throwsException{// this is how I call method compute() that 
> calculates PI using transformations returnobjPI.compute();}});doublepi 
> =PI.collect().get(0);System.out.println("We estimate Pi to be: 
> "+pi);}// this class is of no impotance for my question, howerver, it 
> is relevant for pi calculation 
> publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex

> =Math.random();doubley =Math.random();return(x *x +y *y)<1?1L:0L;}}// 
> this class is of no impotance for my question, howerver, it is 
> relevant for pi calculation 
> publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1

> +value2;}}// this is my class that computes PI, my question is whether 
> such a class is valid in Flink on cluster with parallel computation 
> publicstaticfinalclassclassPI 
> {publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// 
> this is constructor with another 
> ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env 
> =ExecutionEnvironment.getExecutionEnvironment();}//This is the the 
> method that contains all data 
> transformationpublicDoublecompute()throwsException{DataSet<Long>count 
> =env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer());PI 
> =4.0*count.collect().get(0)/NumIter;returnPI;}}}|

View raw message