flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Does Flink allows for encapsulation of transformations?
Date Tue, 07 Jun 2016 12:14:04 GMT
1a. ah. yeah i see how it could work, but i wouldn't count on it in a 
you would (most likely) run the the sub-job (calculating pi) only on a 
single node.

1b. different execution environments generally imply different flink 

2. sure it does, since it's a normal flink job. yours on the other hand 
doesn't, since the job calculating PI only runs on a single TaskManager.

3. there are 2 ways. you can either chain jobs like this: (effectively 
running 2 flink programs in succession)

|publicstaticvoidmain(String[]args)throwsException{doublepi =new 
classPI().compute();System.out.println("We estimate Pi to be: "+pi); new 
classThatNeedsPI().computeWhatever(pi); //feeds pi into an 
env.fromElements call and proceeds from there }|

or (if all building blocks are flink programs) build a single job:

|publicstaticvoidmain(String[]args)throwsException{ ExecutionEnvironment 
env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Double> pi 
=new classPI(env).compute();new 
classThatNeedsPI(env).computeWhatever(pi); //append your transformations 
to pi env.execute(); } ... ||publicDataSet<Double>compute()throwsException{return 
.map(/*return 4 * x*/);} ... public ? computeWhatever(DataSet<Long> pi) 
throws Exception { ... } |

On 07.06.2016 13:35, Ser Kho wrote:
> Chesnay:
> 1a. The code actually works, that is the point.
> 1b. What restrict for a Flink program to have several execution 
> environments?
> 2. I am not sure that your modification allows for parallelism. Does it?
> 3. This code is a simple example of writing/organizing large and 
> complicated programs, where the result of this pi needed to be used in 
> another DataSet transformations beyond classPi(). What to do in this case?
> Thanks a lot for the suggestions.
> On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler 
> <chesnay@apache.org> wrote:
> from what i can tell from your code you are trying to execute a job 
> within a job. This just doesn't work.
> your main method should look like this:
> |publicstaticvoidmain(String[]args)throwsException{doublepi =new 
> classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|
> On 06.06.2016 21:14, Ser Kho wrote:
>> The question is how to encapsulate numerous transformations into one 
>> object or may be a function in Apache Flink Java setting. I have 
>> tried to investigate this question using an example of Pi calculation 
>> (see below). I am wondering whether or not the suggested approach is 
>> valid from the Flink's point of view. It works on one computer, 
>> however, I do not know how it will behave in a cluster setup. The 
>> code is given below, and the main idea behind it as follows:
>>  1. Create a class, named classPI, which method compute() does all
>>     data transformations, see more about it below.
>>  2. In the main method create a DataSet as in *DataSet< classPI > opi
>>     = env.fromElements(new classPI());*
>> 3.
>>     Create *DataSet< Double > PI*, which equals output of
>>     transformation map() that calls the object PI's method compute()
>>     as in
>>     *DataSet< Double > PI = opi.map(new MapFunction< classPI ,
>>     Double>() { public Double map(classPI objPI) { return
>>     objPI.compute(); }});*
>> 4.
>>     Now about ClassPI
>>      *
>>         Constructor instantiates ExecutionEnvironment, which is local
>>         for this class, as in
>>         *public classPI(){ this.NumIter=1000000; env =
>>         ExecutionEnvironment.getExecutionEnvironment();}*
>> Thus, the code has two ExecutionEnvironment objects: one in main and 
>> another in the class classPI.
>>  *
>>     Has method compute() that runs all data transormations (in this
>>     example it is just several lines but potentially it might contain
>>     tons of Flink transfromations)
>>     *public Double compute(){ DataSet count = env.generateSequence(1,
>>     NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
>>     4.0*count.collect().get(0)/NumIter;
>>     return PI;}*
>> the whole code is given below. Again, the question is if this is a 
>> valid approach for encapsulation of data transformation into a class 
>> in Flink setup that is supposed to be parallelizable to work on a 
>> cluster. Is there a better way to hide details of data transformations?
>> Thanks a lot!
>> -------------------------The code ----------------------
>> |publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{// 
>> this is one ExecutionEnvironmentfinalExecutionEnvironmentenv 
>> =ExecutionEnvironment.getExecutionEnvironment();// this is critical 
>> DataSet with my classPI that computes PIDataSet<classPI>opi 
>> =env.fromElements(newclassPI());// this map calls the method 
>> compute() of class classPI that computes PIDataSet<Double>PI 
>> =opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI 
>> objPI)throwsException{// this is how I call method compute() that 
>> calculates PI using transformations 
>> returnobjPI.compute();}});doublepi 
>> =PI.collect().get(0);System.out.println("We estimate Pi to be: 
>> "+pi);}// this class is of no impotance for my question, howerver, it 
>> is relevant for pi calculation 
>> publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex

>> =Math.random();doubley =Math.random();return(x *x +y *y)<1?1L:0L;}}// 
>> this class is of no impotance for my question, howerver, it is 
>> relevant for pi calculation 
>> publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1

>> +value2;}}// this is my class that computes PI, my question is 
>> whether such a class is valid in Flink on cluster with parallel 
>> computation publicstaticfinalclassclassPI 
>> {publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// 
>> this is constructor with another 
>> ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env 
>> =ExecutionEnvironment.getExecutionEnvironment();}//This is the the 
>> method that contains all data 
>> transformationpublicDoublecompute()throwsException{DataSet<Long>count 
>> =env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer());PI 
>> =4.0*count.collect().get(0)/NumIter;returnPI;}}}|

View raw message