flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dan Drewes <dre...@campus.tu-berlin.de>
Subject Re: Flink Iterations vs. While loop
Date Fri, 02 Sep 2016 15:30:23 GMT
Hi Greg,

thanks for your response!

I just had a look and realized that it's just about 85 GB of data. Sorry 
about that wrong information.

It's read from a csv file on the master node's local file system. The 8 
nodes have more than 40 GB available memory each and since the data is 
equally distributed I assume there should be no need to spill anything 
on disk.

There are 9 iterations.

Is it possible that also with Flink Iterations the data is repeatedly 
distributed? Or the other way around: Might it be that flink "remembers" 
somehow that the data is already distributed even for the while loop?

-Dan



Am 02.09.2016 um 16:39 schrieb Greg Hogan:
> Hi Dan,
>
> Where are you reading the 200 GB "data" from? How much memory per 
> node? If the DataSet is read from a distributed filesystem and if with 
> iterations Flink must spill to disk then I wouldn't expect much 
> difference. About how many iterations are run in the 30 minutes? I 
> don't know that this is reported explicitly, but if your convergence 
> function only has one input record per iteration then the reported 
> total is the iteration count.
>
> One other thought, we should soon have support for object reuse with 
> arrays (FLINK-3695). This would be implemented as DoubleValueArray or 
> ValueArray<DoubleValue> rather than double[] but it would be 
> interesting to test for a change in performance.
>
> Greg
>
> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <drewes@campus.tu-berlin.de 
> <mailto:drewes@campus.tu-berlin.de>> wrote:
>
>     Hi,
>
>     for my bachelor thesis I'm testing an implementation of L-BFGS
>     algorithm with Flink Iterations against a version without Flink
>     Iterations but a casual while loop instead. Both programs use the
>     same Map and Reduce transformations in each iteration. It was
>     expected, that the performance of the Flink Iterations would scale
>     better with increasing size of the input data set. However, the
>     measured results on an ibm-power-cluster are very similar for both
>     versions, e.g. around 30 minutes for 200 GB data. The cluster has
>     8 nodes, was configured with 4 slots per node and I used a total
>     parallelism of 32.
>     In every Iteration of the while loop a new flink job is started
>     and I thought, that also the data would be distributed over the
>     network again in each iteration which should consume a significant
>     and measurable amount of time. Is that thought wrong or what is
>     the computional overhead of the flink iterations that is
>     equalizing this disadvantage?
>     I include the relevant part of both programs and also attach the
>     generated execution plans.
>     Thank you for any ideas as I could not find much about this issue
>     in the flink docs.
>
>     Best, Dan
>
>     *Flink Iterations:*
>
>     DataSet<double[]> data = ...
>
>     State  state =initialState(m, initweights,0,new double[initweights.length]);
>     DataSet<State> statedataset = env.fromElements(state);
>
>     //start of iteration section IterativeDataSet<State> loop= statedataset.iterate(niter);;
>
>
>     DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop,"state")
>                    .reduce(accumulate)
>                    .map(new NormLossGradient(datasize))
>                    .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>                    .map(new LBFGS());
>
>
>     DataSet<State> converged = statewithnewlossgradient.filter(
>         new FilterFunction<State>() {
>            @Override public boolean filter(State  value)throws Exception {
>               if(value.getIflag()[0] ==0){
>                  return false;
>               }
>               return true;
>            }
>         }
>     );
>
>     DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>
>     ***While loop: *
>
>     DataSet<double[]> data =... State  state =initialState(m, initweights,0,new
double[initweights.length]);
>
>     int cnt=0;
>     do{
>         LBFGS lbfgs =new LBFGS();
>         statedataset=data.map(difffunction).withBroadcastSet(statedataset,"state")
>            .reduce(accumulate)
>            .map(new NormLossGradient(datasize))
>            .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>            .map(lbfgs);
>         cnt++;
>     }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] !=0);
>

Mime
View raw message