flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Does Flink allows for encapsulation of transformations?
Date Fri, 10 Jun 2016 08:18:29 GMT
Q1:
Whether one of your classes requires the *e**nv* parameter depends on 
whether you want to create a new Source or set a ExecutionEnvironment 
parameter inside the class.
If you don't you can of course not pass it :)
I can't see anything that would prevent it form running on a cluster.

Q2:
Usually, parameters are passed to a UDF through the constructor. You 
/can /use a DataSet within a function initializer block,
but it's rather unusual (this is in-fact the first time I've seen it 
done this way).

You can also just pass a long into the constructor, there is no need to 
use a DataSet and collect().

final ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment();

DataSet<Double> Radius = env.fromElements(10.0); final long numIter = 
1000000L; DataSet<Double> pi = new 
classPI(env).compute(numIter);DataSet<Double> LengthCircle = new 
classLengthCircle().computeLengthCircle(pi, Radius);

public static final class classPI implements Serializable {
	private final ExecutionEnvironment env;

	public classPI(ExecutionEnvironment env) {
		this.env = env;
	}

	public DataSet<Double> compute( final long numIter) throws Exception {
                 return  this.env.generateSequence(1, numIter)
                 .map(new Sampler())
                 .reduce(new SumReducer())
                 .map(new MapFunction<Long, Double >() {
                 	@Override
                     	public Double map(Long arg0) throws Exception {
                     		return arg0 *4.0/numIter;
                   	}});
	}
}

Regards,
Chesnay


//On 10.06.2016 02:46, Ser Kho wrote:
>> Chesnay:
>> I have two simple questions, related to the previous ones about 
>> encapsulation of transformations.
>>
>> Question 1. I have tried to extend my code using your suggestions and 
>> come up with a small concern. First, your code:
> |publicstaticvoidmain(String[]args)throwsException{ 
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); *DataSet<Double> **pi 
> =new classPI(env).compute();*new 
> classThatNeedsPI(env).computeWhatever(pi); //append your 
> transformations to pi env.execute(); } |
> |
> |
> |
> |
> Below is my code (the bold lines are very similar and work ok). The 
> line of concern is marked by blue color. The issue is that I do not 
> use*env* in the constructor of the class classLengthCircle(),instead I 
> use  DataSet *pi* in the methodcomputeLengthCircle(pi, Radius)
> and also DataSet Radius, but the latter does not matter for the 
> question. Then, I proceed with transformations using this 
> DataSet***pi,* see the**class classLengthCircle below. It seems that 
> the logic of this class and its method computeLengthCircle() does not 
> require env at all. My question is if this  code work will on a 
> cluster (it does work on a local computer)?
>
> final ExecutionEnvironment env = 
>  ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<Double> Radius = env.fromElements(10.0);
>         DataSet<Long>    NumIter =env.fromElements(1000000L);
>         // this line is similar to the suggested
> * DataSet<Double> pi = new classPI(env).compute(NumIter);*
> // this line is somewhat different from the suggested, as it has no 
> env in the constructor
>        DataSet<Double> LengthCircle = new 
> classLengthCircle().computeLengthCircle(pi, Radius);
> =========================
> public static final class classLengthCircle
>     {
>  public  DataSet<Double> computeLengthCircle(DataSet<Double> pi, 
> DataSet<Double> Radius)
>         {
>  DataSet<Double> result = pi.cross(Radius).map(
>  new MapFunction<Tuple2<Double, Double>, Double >() {
> @Override
> public Double map(Tuple2<Double, Double> arg0) throws Exception {
> return 2*arg0.f0 *arg0.f1;
> }}
>  );
> return result;
>        }
>     }
>> ================================================
>> Question 2:
>> I tried to enter a parameter *DataSet NumIter* into a class 
>> *MapFunction* of transformation *map(), *see the blue mark in the 
>> code below. It seems this parameter appears in the MapFunction 
>> without explicit passing, since nowhere the line
>>  .map(new MapFunction<Long, Double >()
>> has any mentioning of NumIter.
>> Is the suggested approach a right way to pass a parameter inside the 
>> transformation MapFunction ?
>> Note, that the code works all right on a single computer.
>>
>> public static final class classPI implements Serializable
>    {
> private final ExecutionEnvironment env;
> publicclassPI(ExecutionEnvironment env) {this.env = env;}
> public  DataSet<Double>  compute( final  DataSet<Long> NumIter) throws 
> Exception{
> return  this.env.generateSequence(1, NumIter.collect().get(0))
> .map(new Sampler())
> .reduce(new SumReducer())
> .map(new MapFunction<Long, Double >()
> {
> *Long N = NumIter.collect().get(0);*
> @Override
> public Double map(Long arg0) throws Exception {
> return arg0 *4.0/N;
> }}); }}
>
>> Thanks a lot for your time.
>> Ser
>
>
>
> On Tuesday, June 7, 2016 8:14 AM, Chesnay Schepler 
> <chesnay@apache.org> wrote:
>
>
> 1a. ah. yeah i see how it could work, but i wouldn't count on it in a 
> cluster.
> you would (most likely) run the the sub-job (calculating pi) only on a 
> single node.
>
> 1b. different execution environments generally imply different flink 
> programs.
>
> 2. sure it does, since it's a normal flink job. yours on the other 
> hand doesn't, since the job calculating PI only runs on a single 
> TaskManager.
>
> 3. there are 2 ways. you can either chain jobs like this: (effectively 
> running 2 flink programs in succession)
> |publicstaticvoidmain(String[]args)throwsException{doublepi =new 
> classPI().compute();System.out.println("We estimate Pi to be: " +pi); 
> new classThatNeedsPI().computeWhatever(pi); //feeds pi into an 
> env.fromElements call and proceeds from there }|
> or (if all building blocks are flink programs) build a single job:
> |publicstaticvoidmain(String[]args)throwsException{ 
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); DataSet<Double> pi 
> =new classPI(env).compute();new 
> classThatNeedsPI(env).computeWhatever(pi); //append your 
> transformations to pi env.execute(); } ... ||publicDataSet<Double>compute()throwsException{return

> this.env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer()) 
> .map(/*return 4 * x*/);} ... public ? computeWhatever(DataSet<Long> 
> pi) throws Exception { ... } |
>
> On 07.06.2016 13:35, Ser Kho wrote:
>> Chesnay:
>> 1a. The code actually works, that is the point.
>> 1b. What restrict for a Flink program to have several execution 
>> environments?
>> 2. I am not sure that your modification allows for parallelism. Does it?
>> 3. This code is a simple example of writing/organizing large and 
>> complicated programs, where the result of this pi needed to be used 
>> in another DataSet transformations beyond classPi(). What to do in 
>> this case?
>> Thanks a lot for the suggestions.
>>
>>
>> On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler 
>> <chesnay@apache.org> <mailto:chesnay@apache.org> wrote:
>>
>>
>> from what i can tell from your code you are trying to execute a job 
>> within a job. This just doesn't work.
>>
>> your main method should look like this:
>>
>> |publicstaticvoidmain(String[]args)throwsException{doublepi =new 
>> classPI().compute();System.out.println("We estimate Pi to be: " +pi);}|
>>
>>
>>
>> On 06.06.2016 21:14, Ser Kho wrote:
>>> The question is how to encapsulate numerous transformations into one 
>>> object or may be a function in Apache Flink Java setting. I have 
>>> tried to investigate this question using an example of Pi 
>>> calculation (see below). I am wondering whether or not the suggested 
>>> approach is valid from the Flink's point of view. It works on one 
>>> computer, however, I do not know how it will behave in a cluster 
>>> setup. The code is given below, and the main idea behind it as follows:
>>>
>>>  1. Create a class, named classPI, which method compute() does all
>>>     data transformations, see more about it below.
>>>  2. In the main method create a DataSet as in *DataSet< classPI >
>>>     opi = env.fromElements(new classPI());*
>>> 3.
>>>     Create *DataSet< Double > PI*, which equals output of
>>>     transformation map() that calls the object PI's method compute()
>>>     as in
>>>     *DataSet< Double > PI = opi.map(new MapFunction< classPI ,
>>>     Double>() { public Double map(classPI objPI) { return
>>>     objPI.compute(); }});*
>>> 4.
>>>     Now about ClassPI
>>>      *
>>>         Constructor instantiates ExecutionEnvironment, which is
>>>         local for this class, as in
>>>         *public classPI(){ this.NumIter=1000000; env =
>>>         ExecutionEnvironment.getExecutionEnvironment();}*
>>>
>>> Thus, the code has two ExecutionEnvironment objects: one in main and 
>>> another in the class classPI.
>>>
>>>  *
>>>     Has method compute() that runs all data transormations (in this
>>>     example it is just several lines but potentially it might
>>>     contain tons of Flink transfromations)
>>>     *public Double compute(){ DataSet count =
>>>     env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new
>>>     SumReducer()); PI = 4.0*count.collect().get(0)/NumIter;
>>>     return PI;}*
>>>
>>> the whole code is given below. Again, the question is if this is a 
>>> valid approach for encapsulation of data transformation into a class 
>>> in Flink setup that is supposed to be parallelizable to work on a 
>>> cluster. Is there a better way to hide details of data transformations?
>>> Thanks a lot!
>>>
>>> -------------------------The code ----------------------
>>>
>>> |public< span id="yiv9579689340yui_3_16_0_ym19_1_1465213860132_46078" 
>>> style="margin:0px;border:0px;color:rgb(16, 16, 
>>> 148);">classPiEstimation{publicstaticvoidmain(String[]args)throwsException{//

>>> this is one ExecutionEnvironmentfinalExecutionEnvironmentenv 
>>> =ExecutionEnvironment .getExecutionEnvironment();// this is critical 
>>> DataSet with my classPI that computes PIDataSet<classPI>opi 
>>> =env.fromElements(newclassPI());// this map calls the method 
>>> compute() of class classPI that computes PIDataSet<Double>PI 
>>> =opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI 
>>> objPI)throwsException{// this is how I call method compute() that 
>>> calculates PI using transformations 
>>> returnobjPI.compute();}});doublepi 
>>> =PI.collect().get(0);System.out.println("We estimate Pi to be: " 
>>> +pi);}// this class is of no impotance for my question, howerver, it 
>>> is relevant for pi calculation 
>>> publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex

>>> =Math.random();doubley =Math.random();return(x *x +y 
>>> *y)<1?1L:0L;}}// this class is of no impotance for my question, 
>>> howerver, it is relevant for pi calculation 
>>> publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1

>>> +value2;}}// this is my class that computes PI, my question is 
>>> whether such a class is valid in Flink on cluster with parallel 
>>> computation publicstaticfinalclassclassPI 
>>> {publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// 
>>> this is constructor with another 
>>> ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env 
>>> =ExecutionEnvironment.getExecutionEnvironment();}//This is the the 
>>> method that contains all data 
>>> transformationpublicDoublecompute()throwsException{DataSet<Long>count =env.generateSequence(1,NumIter).map(newSampler())

>>> .reduce(newSumReducer()) ;PI 
>>> =4.0*count.collect().get(0)/NumIter;returnPI;}}}|
>>
>>
>>
>
>
>


Mime
View raw message