flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Imbalanced workload between workers
Date Thu, 28 Jan 2016 21:18:44 GMT
Hi!

Thank you for looking into this.

Such gradual slowdown is often caused by memory leaks, which reduces
available memory and makes GC more expensive.

Can you create a heap dump after the series of runs that cause the
slowdown? Then we can have a look whether there are leaked Flink objects,
or if the user code is causing that.

Greetings,
Stephan


On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <phameete@gmail.com> wrote:

> Hi Till and Stephan,
>
> I've got some additional information. I just ran a query 60 times in a
> row, and all jobs ran in roughly the same time. I did this for both a
> simple job that requires intensive parsing and a more complex query that
> has several cogroups. Neither of the cases showed any slowing down. I
> believe this is an indication that it is not a problem with unreliable I/O
> or computational resources.
>
> However, when I now run the complete set of 15 different queries again,
> with 3 repetitions each, things start slowing down very quickly.
>
> The code I use to run the jobs can be found here:
> https://github.com/PHameete/dawn-flink/blob/feature/loading-algorithm-v2/src/main/scala/wis/dawnflink/performance/DawnBenchmarkSuite.scala
>
> I run this as follows in the GCloud: /flink run dawn-flink.jar
> gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv
>
> Is there something I am doing incorrectly by running these different
> queries in sequence like this? I have attempted parametrizing further so I
> would call ./flink run for each separate query but this did not make a
> difference.
>
> Cheers!
>
> - Pieter
>
>
>
> 2016-01-28 11:28 GMT+01:00 Pieter Hameete <phameete@gmail.com>:
>
>> Hi Till, Stephan,
>>
>> Indeed I think you're right it is just a coincidence that the three jobs
>> started producing output as soon as the first one finished. Getting back to
>> my comment that after the upgrade to Flink 0.10.1 the job was much faster I
>> searched a bit further. I've now noticed that after setting up a fresh
>> GCloud cluster the job runs roughly 15x faster than the case I just showed
>> you. Im executing many, many jobs in sequence and it seems as if
>> performance is degrading a lot over time. Let me tell you what I'm doing
>> exactly:
>>
>> I'm running a set of 16 queries in sequence on the cluster. Each query is
>> executed multiple (5) times in a row so I can average measurements for a
>> more reliable estimate of performance in the GCloud. This means that I'm
>> running a total of 60 jobs in sequence, and on 5 different datasets,
>> totalling to 300 jobs. For each job I get the execution environment, define
>> the query, write to an output file and run the env.execute. After each job
>> I clear the output folder so the next job can start fresh.
>>
>> Now I'm wondering what causes this performance degradation. Could this
>> have to do with the GCloud (I surely hope they don't scale back my
>> performance by 90% ;-) , do I need to some more cleaning and resetting on
>> the Flink Cluster, or is my above approach asking for trouble?
>>
>> Thanks again for your help!
>>
>> - Pieter
>>
>>
>>
>>
>>
>> 2016-01-28 10:58 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
>>
>>> Hi Pieter,
>>>
>>> you can see in the log that the operators are all started at the same
>>> time. However, you're right that they don't finish at the same time. The
>>> sub tasks which run on the same node exhibit a similar runtime. However,
>>> all nodes (not only hadoop-w-0 compared to the others) show different
>>> runtimes. I would guess that this is due to some other load on the GCloud
>>> machines or some other kind of asymmetry between the hosts.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <phameete@gmail.com>
>>> wrote:
>>>
>>>> Hi Stephen, Till,
>>>>
>>>> I've watched the Job again and please see the log of the CoGroup
>>>> operator:
>>>>
>>>> [image: Inline afbeelding 1]
>>>>
>>>> All workers get to process a fairly distributed amount of bytes and
>>>> records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working
>>>> until hadoop-w-1 is finished. Is this behavior to be expected with a
>>>> CoGroup or could there still be something wrong in the distrubtion of the
>>>> data?
>>>>
>>>> Kind regards,
>>>>
>>>> Pieter
>>>>
>>>> 2016-01-27 21:48 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>
>>>>> Hi Pieter!
>>>>>
>>>>> Interesting, but good :-)
>>>>>
>>>>> I don't think we did much on the hash functions since 0.9.1. I am a
>>>>> bit surprised that it made such a difference. Well, as long as it improves
>>>>> with the newer version :-)
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <phameete@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>>
>>>>>> i've upgraded to Flink 0.10.1 and ran the job again without any
>>>>>> changes to the code to see the bytes input and output of the operators
and
>>>>>> for the different workers.To my surprise it is very well balanced
between
>>>>>> all workers and because of this the job completed much faster.
>>>>>>
>>>>>> Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could
>>>>>> cause this to be better for me now?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Pieter
>>>>>>
>>>>>> 2016-01-27 14:10 GMT+01:00 Pieter Hameete <phameete@gmail.com>:
>>>>>>
>>>>>>>
>>>>>>> Cheers for the quick reply Till.
>>>>>>>
>>>>>>> That would be very useful information to have! I'll upgrade my
>>>>>>> project to Flink 0.10.1 tongiht and let you know if I can find
out if
>>>>>>> theres a skew in the data :-)
>>>>>>>
>>>>>>> - Pieter
>>>>>>>
>>>>>>>
>>>>>>> 2016-01-27 13:49 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
>>>>>>>
>>>>>>>> Could it be that your data is skewed? This could lead to
different
>>>>>>>> loads on different task managers.
>>>>>>>>
>>>>>>>> With the latest Flink version, the web interface should show
you
>>>>>>>> how many bytes each operator has written and received. There
you could see
>>>>>>>> if one operator receives more elements than the others.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <phameete@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hi guys,
>>>>>>>>>
>>>>>>>>> Currently I am running a job in the GCloud in a configuration
with
>>>>>>>>> 4 task managers that each have 4 CPUs (for a total parallelism
of 16).
>>>>>>>>>
>>>>>>>>> However, I noticed my job is running much slower than
expected and
>>>>>>>>> after some more investigation I found that one of the
workers is doing a
>>>>>>>>> majority of the work (its CPU load was at 100% while
the others were almost
>>>>>>>>> idle).
>>>>>>>>>
>>>>>>>>> My job execution plan can be found here:
>>>>>>>>> http://i.imgur.com/fHKhVFf.png
>>>>>>>>>
>>>>>>>>> The input is split into multiple files so loading the
data is
>>>>>>>>> properly distributed over the workers.
>>>>>>>>>
>>>>>>>>> I am wondering if you can provide me with some tips on
how to
>>>>>>>>> figure out what is going wrong here:
>>>>>>>>>
>>>>>>>>>    - Could this imbalance in workload be the result of
an
>>>>>>>>>    imbalance in the hash paritioning?
>>>>>>>>>    - Is there a convenient way to see how many elements
each
>>>>>>>>>       worker gets to process? Would it work to write
the output of the CoGroup to
>>>>>>>>>       disk because each worker writes to its own output
file and investigate the
>>>>>>>>>       differences?
>>>>>>>>>    - Is there something strange about the execution plan
that
>>>>>>>>>    could cause this?
>>>>>>>>>
>>>>>>>>> Thanks and kind regards,
>>>>>>>>>
>>>>>>>>> Pieter
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message