flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ser Kho <khov2...@yahoo.com>
Subject Does Flink allows for encapsulation of transformations?
Date Mon, 06 Jun 2016 19:14:02 GMT
The question is how to encapsulate numerous transformations into one object or may be a function
in Apache Flink Java setting. I have tried to investigate this question using an example of
Pi calculation (see below). I am wondering whether or not the suggested approach is valid
from the Flink's point of view. It works on one computer, however, I do not know how it will
behave in a cluster setup. The code is given below, and the main idea behind it as follows:
  
   - Create a class, named classPI, which method compute() does all data transformations,
see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new
classPI());
   - Create DataSet< Double > PI, which equals output of transformation map() that
calls the object PI's method compute() as inDataSet< Double > PI = opi.map(new MapFunction<
classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }});
   - Now about ClassPI      
      - Constructor instantiates ExecutionEnvironment, which is local for this class, as inpublic
classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();}

Thus, the code has two ExecutionEnvironment objects: one in main and another in the class
classPI.   
   - Has method compute() that runs all data transormations (in this example it is just several
lines but potentially it might contain tons of Flink transfromations)public Double compute(){
DataSet count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer());
PI = 4.0*count.collect().get(0)/NumIter;   
return PI;}
the whole code is given below. Again, the question is if this is a valid approach for encapsulation
of data transformation into a class in Flink setup that is supposed to be parallelizable to
work on a cluster. Is there a better way to hide details of data transformations?Thanks a
lot!
-------------------------The code ----------------------
public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet<classPI> opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using transformations  
   return objPI.compute(); } });    

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation

public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long value) {
    double x = Math.random();
    double y = Math.random();
    return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation

public static final class SumReducer implements ReduceFunction<Long>{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is valid in Flink
on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
           this.NumIter=1000000;
            env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
         DataSet<Long> count = env.generateSequence(1, NumIter)
                               .map(new Sampler())
                               .reduce(new SumReducer());
         PI = 4.0*count.collect().get(0)/NumIter;                                        
  
         return  PI;}}}
Mime
View raw message