flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ser Kho <khov2...@yahoo.com>
Subject Re: Does Flink allows for encapsulation of transformations?
Date Tue, 07 Jun 2016 11:35:53 GMT
Chesnay: 
1a. The code actually works, that is the point. 1b. What restrict for a Flink program to
have several execution environments?2. I am not sure that your modification allows for parallelism.
Does it?3. This code is a simple example of writing/organizing large and complicated programs,
where the result of this pi needed to be used in another DataSet transformations beyond classPi().
What to do in this case?Thanks a lot for the suggestions. 

    On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler <chesnay@apache.org> wrote:
 

  from what i can tell from your code you are trying to execute a job within a job. This just
doesn't work.
 
 your main method should look like this:
 
 public static void main(String[] args) throws Exception 
{
  double pi = new classPI().compute();
   System.out.println("We estimate Pi to be: " + pi);   
} 
 
 
 On 06.06.2016 21:14, Ser Kho wrote:
  
  The question is how to encapsulate numerous transformations into one object or may be a
function in Apache Flink Java setting. I have tried to investigate this question using an
example of Pi calculation (see below). I am wondering whether or not the suggested approach
is valid from the Flink's point of view. It works on  one computer, however, I do not know
how it will behave in a cluster setup. The code is given below, and the main idea behind it
as follows:    
   - Create a class, named classPI, which method compute() does all data transformations,
see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new
classPI());
   -  Create DataSet< Double > PI, which equals output of transformation map() that
calls the object PI's method compute() as in DataSet< Double > PI = opi.map(new MapFunction<
classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }}); 
   -  Now about ClassPI       
      -  Constructor instantiates ExecutionEnvironment, which is local for this class, as
in public classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();}

 
 Thus, the code has two ExecutionEnvironment objects: one in main and another in the class
classPI.    
   -  Has method compute() that runs all data transormations (in this example it is just several
lines but potentially it might contain tons of Flink transfromations) public Double compute(){
DataSet count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer());
PI = 4.0*count.collect().get(0)/NumIter;   
 return PI;} 
 the whole code is given below. Again, the question is if this is a valid approach for encapsulation
of data transformation into a class in Flink setup that is supposed to be parallelizable to
work on a cluster. Is there a better way to hide details of data transformations? Thanks a
lot! 
  -------------------------The code ---------------------- 
  public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet<classPI> opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using transformations  
   return objPI.compute(); } });    

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation

public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long value) {
    double x = Math.random();
    double y = Math.random();
    return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation

public static final class SumReducer implements ReduceFunction<Long>{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is valid in Flink
on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
           this.NumIter=1000000;
            env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
         DataSet<Long> count = env.generateSequence(1, NumIter
)
                               .map(new Sampler())
                               .reduce(new SumReducer());
         PI = 4.0*count.collect().get(0)/NumIter;                      
                     
         return  PI;}}}  
 
 

  
Mime
View raw message