flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Imbalanced workload between workers
Date Fri, 29 Jan 2016 12:36:11 GMT
Super, thanks a lot.

I think we need to find some time and look through this, but this should
help a lot!

Greetings,
Stephan


On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <phameete@gmail.com> wrote:

> Hi Stephan,
>
> cheers for your response. I have the heap dumps, you can download them
> here (note they are fairly large, tarballs of up to 2GB each, untarred up
> to 12GB)  :
>
> *Job Manager:*
>
> after the slowdown series:
> https://storage.googleapis.com/dawn-flink/heapdumps/heapm-after.tar.gz
>
> *Task Manager 0:*
>
> after the slowdown series:
> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-after.tar.gz
> during the slowdown series:
> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-during.tar.gz
> during the slowdown series:
> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-during2.tar.gz
>
> *Task Manager 1:*
>
> after the slowdown series:
> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-after.tar.gz
> during the slowdown series:
> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-during.tar.gz
> during the slowdown series:
> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-during2.tar.gz
>
> I've also attempted to reproduce the issue in a local cluster with a
> single 4CPU task manager, however no slowdown occurs so far.
>
> Please let me know if there is anything else I can provide you with.
>
> - Pieter
>
> 2016-01-28 22:18 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>
>> Hi!
>>
>> Thank you for looking into this.
>>
>> Such gradual slowdown is often caused by memory leaks, which reduces
>> available memory and makes GC more expensive.
>>
>> Can you create a heap dump after the series of runs that cause the
>> slowdown? Then we can have a look whether there are leaked Flink objects,
>> or if the user code is causing that.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <phameete@gmail.com>
>> wrote:
>>
>>> Hi Till and Stephan,
>>>
>>> I've got some additional information. I just ran a query 60 times in a
>>> row, and all jobs ran in roughly the same time. I did this for both a
>>> simple job that requires intensive parsing and a more complex query that
>>> has several cogroups. Neither of the cases showed any slowing down. I
>>> believe this is an indication that it is not a problem with unreliable I/O
>>> or computational resources.
>>>
>>> However, when I now run the complete set of 15 different queries again,
>>> with 3 repetitions each, things start slowing down very quickly.
>>>
>>> The code I use to run the jobs can be found here:
>>> https://github.com/PHameete/dawn-flink/blob/feature/loading-algorithm-v2/src/main/scala/wis/dawnflink/performance/DawnBenchmarkSuite.scala
>>>
>>> I run this as follows in the GCloud: /flink run dawn-flink.jar
>>> gs://dawn-flink/data/split4G gs://dawn-flink/output/ 3 result.csv
>>>
>>> Is there something I am doing incorrectly by running these different
>>> queries in sequence like this? I have attempted parametrizing further so I
>>> would call ./flink run for each separate query but this did not make a
>>> difference.
>>>
>>> Cheers!
>>>
>>> - Pieter
>>>
>>>
>>>
>>> 2016-01-28 11:28 GMT+01:00 Pieter Hameete <phameete@gmail.com>:
>>>
>>>> Hi Till, Stephan,
>>>>
>>>> Indeed I think you're right it is just a coincidence that the three
>>>> jobs started producing output as soon as the first one finished. Getting
>>>> back to my comment that after the upgrade to Flink 0.10.1 the job was much
>>>> faster I searched a bit further. I've now noticed that after setting up a
>>>> fresh GCloud cluster the job runs roughly 15x faster than the case I just
>>>> showed you. Im executing many, many jobs in sequence and it seems as if
>>>> performance is degrading a lot over time. Let me tell you what I'm doing
>>>> exactly:
>>>>
>>>> I'm running a set of 16 queries in sequence on the cluster. Each query
>>>> is executed multiple (5) times in a row so I can average measurements for
a
>>>> more reliable estimate of performance in the GCloud. This means that I'm
>>>> running a total of 60 jobs in sequence, and on 5 different datasets,
>>>> totalling to 300 jobs. For each job I get the execution environment, define
>>>> the query, write to an output file and run the env.execute. After each job
>>>> I clear the output folder so the next job can start fresh.
>>>>
>>>> Now I'm wondering what causes this performance degradation. Could this
>>>> have to do with the GCloud (I surely hope they don't scale back my
>>>> performance by 90% ;-) , do I need to some more cleaning and resetting on
>>>> the Flink Cluster, or is my above approach asking for trouble?
>>>>
>>>> Thanks again for your help!
>>>>
>>>> - Pieter
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 2016-01-28 10:58 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
>>>>
>>>>> Hi Pieter,
>>>>>
>>>>> you can see in the log that the operators are all started at the same
>>>>> time. However, you're right that they don't finish at the same time.
The
>>>>> sub tasks which run on the same node exhibit a similar runtime. However,
>>>>> all nodes (not only hadoop-w-0 compared to the others) show different
>>>>> runtimes. I would guess that this is due to some other load on the GCloud
>>>>> machines or some other kind of asymmetry between the hosts.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete <phameete@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Stephen, Till,
>>>>>>
>>>>>> I've watched the Job again and please see the log of the CoGroup
>>>>>> operator:
>>>>>>
>>>>>> [image: Inline afbeelding 1]
>>>>>>
>>>>>> All workers get to process a fairly distributed amount of bytes and
>>>>>> records, BUT hadoop-w-0, hadoop-w-2 and hadoop-w-3 don't start working
>>>>>> until hadoop-w-1 is finished. Is this behavior to be expected with
a
>>>>>> CoGroup or could there still be something wrong in the distrubtion
of the
>>>>>> data?
>>>>>>
>>>>>> Kind regards,
>>>>>>
>>>>>> Pieter
>>>>>>
>>>>>> 2016-01-27 21:48 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>
>>>>>>> Hi Pieter!
>>>>>>>
>>>>>>> Interesting, but good :-)
>>>>>>>
>>>>>>> I don't think we did much on the hash functions since 0.9.1.
I am a
>>>>>>> bit surprised that it made such a difference. Well, as long as
it improves
>>>>>>> with the newer version :-)
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jan 27, 2016 at 9:42 PM, Pieter Hameete <phameete@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Till,
>>>>>>>>
>>>>>>>> i've upgraded to Flink 0.10.1 and ran the job again without
any
>>>>>>>> changes to the code to see the bytes input and output of
the operators and
>>>>>>>> for the different workers.To my surprise it is very well
balanced between
>>>>>>>> all workers and because of this the job completed much faster.
>>>>>>>>
>>>>>>>> Are there any changes/fixes between Flink 0.9.1 and 0.10.1
that
>>>>>>>> could cause this to be better for me now?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Pieter
>>>>>>>>
>>>>>>>> 2016-01-27 14:10 GMT+01:00 Pieter Hameete <phameete@gmail.com>:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers for the quick reply Till.
>>>>>>>>>
>>>>>>>>> That would be very useful information to have! I'll upgrade
my
>>>>>>>>> project to Flink 0.10.1 tongiht and let you know if I
can find out if
>>>>>>>>> theres a skew in the data :-)
>>>>>>>>>
>>>>>>>>> - Pieter
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2016-01-27 13:49 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
>>>>>>>>>
>>>>>>>>>> Could it be that your data is skewed? This could
lead to
>>>>>>>>>> different loads on different task managers.
>>>>>>>>>>
>>>>>>>>>> With the latest Flink version, the web interface
should show you
>>>>>>>>>> how many bytes each operator has written and received.
There you could see
>>>>>>>>>> if one operator receives more elements than the others.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete <
>>>>>>>>>> phameete@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi guys,
>>>>>>>>>>>
>>>>>>>>>>> Currently I am running a job in the GCloud in
a configuration
>>>>>>>>>>> with 4 task managers that each have 4 CPUs (for
a total parallelism of 16).
>>>>>>>>>>>
>>>>>>>>>>> However, I noticed my job is running much slower
than expected
>>>>>>>>>>> and after some more investigation I found that
one of the workers is doing
>>>>>>>>>>> a majority of the work (its CPU load was at 100%
while the others were
>>>>>>>>>>> almost idle).
>>>>>>>>>>>
>>>>>>>>>>> My job execution plan can be found here:
>>>>>>>>>>> http://i.imgur.com/fHKhVFf.png
>>>>>>>>>>>
>>>>>>>>>>> The input is split into multiple files so loading
the data is
>>>>>>>>>>> properly distributed over the workers.
>>>>>>>>>>>
>>>>>>>>>>> I am wondering if you can provide me with some
tips on how to
>>>>>>>>>>> figure out what is going wrong here:
>>>>>>>>>>>
>>>>>>>>>>>    - Could this imbalance in workload be the
result of an
>>>>>>>>>>>    imbalance in the hash paritioning?
>>>>>>>>>>>    - Is there a convenient way to see how many
elements each
>>>>>>>>>>>       worker gets to process? Would it work to
write the output of the CoGroup to
>>>>>>>>>>>       disk because each worker writes to its
own output file and investigate the
>>>>>>>>>>>       differences?
>>>>>>>>>>>    - Is there something strange about the execution
plan that
>>>>>>>>>>>    could cause this?
>>>>>>>>>>>
>>>>>>>>>>> Thanks and kind regards,
>>>>>>>>>>>
>>>>>>>>>>> Pieter
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message