The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:
  1. Create a class, named classPI, which method compute() does all data transformations, see more about it below.
  2. In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new classPI());
  3. Create DataSet< Double > PI, which equals output of transformation map() that calls the object PI's method compute() as in
    DataSet< Double > PI = MapFunction< classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }});
  4. Now about ClassPI
    • Constructor instantiates ExecutionEnvironment, which is local for this class, as in
      public classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();}
Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.
the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations?
Thanks a lot!

-------------------------The code ----------------------

public class PiEstimation{

public static void main(String[] args) throws Exception 
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet<classPI> opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet<Double> PI = MapFunction<classPI , Double>() 
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using transformations  
   return objPI.compute(); } });    

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static class Sampler implements MapFunction<Long, Long> {
public Long map(Long value) {
    double x = Math.random();
    double y = Math.random();
    return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static final class SumReducer implements ReduceFunction<Long>{
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is valid in Flink on  cluster with parallel computation 
public static final class classPI
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
            env = ExecutionEnvironment.getExecutionEnvironment();
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
         DataSet<Long> count = env.generateSequence(1, NumIter)
                               .map(new Sampler())
                               .reduce(new SumReducer());
         PI = 4.0*count.collect().get(0)/NumIter;                                           
         return  PI;}}}