The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:
- Create a class, named classPI, which method compute() does all data transformations, see more about it below.
- In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new classPI());
- Create DataSet< Double > PI, which equals output of transformation map() that calls the object PI's method compute() as inDataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }});
- Now about ClassPI
- Constructor instantiates ExecutionEnvironment, which is local for this class, as inpublic classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();}
Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.
- Has method compute() that runs all data transormations (in this example it is just several lines but potentially it might contain tons of Flink transfromations)public Double compute(){ DataSet count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI = 4.0*count.collect().get(0)/NumIter;
return PI;}
the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations?Thanks a lot!
-------------------------The code ----------------------
public class PiEstimation{
public static void main(String[] args) throws Exception
{
// this is one ExecutionEnvironment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// this is critical DataSet with my classPI that computes PI
DataSet opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
DataSet PI = opi.map(new MapFunction()
{
public Double map(classPI objPI) throws Exception {
// this is how I call method compute() that calculates PI using transformations
return objPI.compute(); } });
double pi = PI.collect().get(0);
System.out.println("We estimate Pi to be: " + pi);
}
// this class is of no impotance for my question, howerver, it is relevant for pi calculation
public static class Sampler implements MapFunction {
@Override
public Long map(Long value) {
double x = Math.random();
double y = Math.random();
return (x * x + y * y) < 1 ? 1L : 0L;}}
// this class is of no impotance for my question, howerver, it is relevant for pi calculation
public static final class SumReducer implements ReduceFunction{
@Override
public Long reduce(Long value1, Long value2) {
return value1 + value2;}}
// this is my class that computes PI, my question is whether such a class is valid in Flink on cluster with parallel computation
public static final class classPI
{
public Integer NumIter;
private final ExecutionEnvironment env;
public Double PI;
// this is constructor with another ExecutionEnvironment
public classPI(){
this.NumIter=1000000;
env = ExecutionEnvironment.getExecutionEnvironment();
}
//This is the the method that contains all data transformation
public Double compute() throws Exception{
DataSet count = env.generateSequence(1, NumIter)
.map(new Sampler())
.reduce(new SumReducer());
PI = 4.0*count.collect().get(0)/NumIter;
return PI;}}}