flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Flink Iterations vs. While loop
Date Wed, 07 Sep 2016 15:25:10 GMT
Hi Dan,

first a general remark: I fear that your L-BFGS implementation is not well
suited for large scale problems. You might wanna take a look at [1].

In the case of the while loop solution you're actually executing n jobs
with n being the number of iterations. Thus, you have to add the execution
times for all jobs together. Did you do that?

[1] https://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf

On Wed, Sep 7, 2016 at 3:43 PM, Dan Drewes <drewes@campus.tu-berlin.de>
wrote:

> Thank you for your replys so far!
>
> I've uploaded the files to github:
>
> iterations: https://github.com/dan-drewes/thesis/blob/master/
> IterationsLBFGS.java
>
> while loop: https://github.com/dan-drewes/thesis/blob/master/flinkLBFGS.
> java
>
> The additional classes I wrote are there, too.
>
> I had 32 task slots in total, that's 4 on each node, so their should
> actually be enough memory.
>
> I don't know if there is a better way of profiling but I have the
> timelines for both versions attached with this post. As far as I can see
> it, there is not much difference in reading the data. However, maybe you
> can see anything i didn't.
>
> Thanks again for your help,
> Dan
>
>
>
> Am 07.09.2016 um 10:23 schrieb Till Rohrmann:
>
> Usually, the while loop solution should perform much worse since it will
> execute with each new iteration all previous iterations steps without
> persisting the intermediate results. Thus, it should have a quadratic
> complexity in terms of iteration step operations instead of a linear
> complexity. Additionally the while loop will suffer from memory
> fragmentation because of the explicit DAG unrolling.
>
> I agree with Theo that access to the full code would help a lot to
> pinpoint the problem.
>
> Cheers,
> Till
>
> On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
>> Have you tried profiling the application to see where most of the time is
>> spent during the runs?
>>
>> If most of the time is spent reading in the data maybe any difference
>> between the two methods is being obscured.
>>
>> --
>> Sent from a mobile device. May contain autocorrect errors.
>>
>> On Sep 6, 2016 4:55 PM, "Greg Hogan" <code@greghogan.com> wrote:
>>
>>> Hi Dan,
>>>
>>> Flink currently allocates each task slot an equal portion of managed
>>> memory. I don't know the best way to count task slots.
>>>   https://ci.apache.org/projects/flink/flink-docs-master/conce
>>> pts/index.html#workers-slots-resources
>>>
>>> If you assign TaskManagers less memory then Linux will use the memory to
>>> cache spill files.
>>>
>>> Greg
>>>
>>> On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <drewes@campus.tu-berlin.de>
>>> wrote:
>>>
>>>> Hi Greg,
>>>>
>>>> thanks for your response!
>>>>
>>>> I just had a look and realized that it's just about 85 GB of data.
>>>> Sorry about that wrong information.
>>>>
>>>> It's read from a csv file on the master node's local file system. The 8
>>>> nodes have more than 40 GB available memory each and since the data is
>>>> equally distributed I assume there should be no need to spill anything on
>>>> disk.
>>>>
>>>> There are 9 iterations.
>>>>
>>>> Is it possible that also with Flink Iterations the data is repeatedly
>>>> distributed? Or the other way around: Might it be that flink "remembers"
>>>> somehow that the data is already distributed even for the while loop?
>>>>
>>>> -Dan
>>>>
>>>>
>>>>
>>>> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>>>>
>>>> Hi Dan,
>>>>
>>>> Where are you reading the 200 GB "data" from? How much memory per node?
>>>> If the DataSet is read from a distributed filesystem and if with iterations
>>>> Flink must spill to disk then I wouldn't expect much difference. About how
>>>> many iterations are run in the 30 minutes? I don't know that this is
>>>> reported explicitly, but if your convergence function only has one input
>>>> record per iteration then the reported total is the iteration count.
>>>>
>>>> One other thought, we should soon have support for object reuse with
>>>> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
>>>> ValueArray<DoubleValue> rather than double[] but it would be interesting
to
>>>> test for a change in performance.
>>>>
>>>> Greg
>>>>
>>>> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <drewes@campus.tu-berlin.de>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> for my bachelor thesis I'm testing an implementation of L-BFGS
>>>>> algorithm with Flink Iterations against a version without Flink Iterations
>>>>> but a casual while loop instead. Both programs use the same Map and Reduce
>>>>> transformations in each iteration. It was expected, that the performance
of
>>>>> the Flink Iterations would scale better with increasing size of the input
>>>>> data set. However, the measured results on an ibm-power-cluster are very
>>>>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>>>>> cluster has 8 nodes, was configured with 4 slots per node and I used
a
>>>>> total parallelism of 32.
>>>>> In every Iteration of the while loop a new flink job is started and I
>>>>> thought, that also the data would be distributed over the network again
in
>>>>> each iteration which should consume a significant and measurable amount
of
>>>>> time. Is that thought wrong or what is the computional overhead of the
>>>>> flink iterations that is equalizing this disadvantage?
>>>>> I include the relevant part of both programs and also attach the
>>>>> generated execution plans.
>>>>> Thank you for any ideas as I could not find much about this issue in
>>>>> the flink docs.
>>>>>
>>>>> Best, Dan
>>>>>
>>>>> *Flink Iterations:*
>>>>>
>>>>> DataSet<double[]> data = ...
>>>>>
>>>>> State state = initialState(m, initweights,0,new double[initweights.length]);
>>>>> DataSet<State> statedataset = env.fromElements(state);
>>>>> //start of iteration sectionIterativeDataSet<State> loop= statedataset.iterate(niter);;
>>>>>
>>>>>
>>>>> DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop,
"state")
>>>>>               .reduce(accumulate)
>>>>>               .map(new NormLossGradient(datasize))
>>>>>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>>>>               .map(new LBFGS());
>>>>>
>>>>>
>>>>> DataSet<State> converged = statewithnewlossgradient.filter(
>>>>>    new FilterFunction<State>() {
>>>>>       @Override      public boolean filter(State value) throws Exception
{
>>>>>          if(value.getIflag()[0] == 0){
>>>>>             return false;
>>>>>          }
>>>>>          return true;
>>>>>       }
>>>>>    }
>>>>> );
>>>>>
>>>>> DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>>>>>
>>>>> *While loop: *
>>>>>
>>>>> DataSet<double[]> data =...
>>>>> State state = initialState(m, initweights,0,new double[initweights.length]);
>>>>> int cnt=0;do{
>>>>>    LBFGS lbfgs = new LBFGS();
>>>>>    statedataset=data.map(difffunction).withBroadcastSet(statedataset,
"state")
>>>>>       .reduce(accumulate)
>>>>>       .map(new NormLossGradient(datasize))
>>>>>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>>>>       .map(lbfgs);
>>>>>    cnt++;
>>>>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0]
!= 0);
>>>>>
>>>>>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
Virenfrei.
> www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
>

Mime
View raw message