flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pieter Hameete <phame...@gmail.com>
Subject Re: Imbalanced workload between workers
Date Sat, 06 Feb 2016 14:38:39 GMT
Hi Stephan,

Thank you for your time looking into this. I might be able to check those
things later if you believe it would be interesting to find the source of
this problem.

For me the Google Cloud is getting too expensive to use right now. Instead
I found a YARN cluster that I can use for free and I'm hoping that getting
fresh containers for each job will be a good workaround until then.

Cheers,

- Pieter



2016-02-06 15:12 GMT+01:00 Stephan Ewen <sewen@apache.org>:

> Hi!
>
> Then the heap dump looks actually non-suspicious. The 8.x GB of managed
> memory corresponds to what is expected.
>
> If there is no memory leak, what could be the reason?
>
>   - Thread leaks? You can try and create thread dumps (via jstack) after
> the slowdown (between invocations of the program).
>
>   - Is the JITer going into weird behavior, in the end not compiling as
> aggressively? Maybe classes do not get properly unloaded, or the code cache
> (where JITted assembler code is stored) flows over.
>
> Can you check at the beginning of the log files, what garbage collectors
> are set? Is classunloading is actvated?
>
> Greetings,
> Stephan
>
>
>
> On Fri, Feb 5, 2016 at 6:14 PM, Pieter Hameete <phameete@gmail.com> wrote:
>
>> Hi Stephan,
>>
>> cheers for looking at this :-) The TaskManagers have a 12GB of available
>> heapspace configured using taskmanager.heap.mb. The configurations that you
>> mention should be default.
>>
>> - Pieter
>>
>>
>>
>> 2016-02-05 18:10 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>
>>> EDIT: The by[] make up for 98% of the memory. That seems correct. 80%
>>> would mean something else is consuming a lot, which should not be the case.
>>>
>>> On Fri, Feb 5, 2016 at 6:09 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> You are using Flink 0.10, right?
>>>>
>>>> The byte[] are simply Flink's reserved managed memory. It is expected
>>>> that they occupy a large portion of the memory. (assuming you run 0.10 is
>>>> BATCH mode).
>>>> These byte[] should also not get more or fewer, so they should not
>>>> cause degradation.
>>>>
>>>> I can also not spot anything suspicious (in the after slowdown dumps).
>>>> The fact that the byte[] make up for 80% of the memory means that nothing
>>>> else can really be leaking a lot.
>>>>
>>>> Did you configure a specific amount of memory (taskmanager.memory.size
>>>> or taskmanager.memory.fraction) ?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Wed, Feb 3, 2016 at 10:04 AM, Pieter Hameete <phameete@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Stephan,
>>>>>
>>>>> did you manage to take a look at the dumps? I have looked myself but
I
>>>>> could not find anything that would point towards a memory leak in my
user
>>>>> code. I have a parser that creates quite a lot of objects, but these
are
>>>>> not abundantly present in the heap dump. I did see that the majority
of the
>>>>> heap was consisting of byte objects (but this is normal?) as well as
Akka
>>>>> objects in another dump.
>>>>>
>>>>> I am not familiar enough with Flink to know what is normal to see in
>>>>> the heap dumps. I am looking forward to hearing what you think :-)
>>>>>
>>>>> - Pieter
>>>>>
>>>>> 2016-01-29 13:36 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>
>>>>>> Super, thanks a lot.
>>>>>>
>>>>>> I think we need to find some time and look through this, but this
>>>>>> should help a lot!
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <phameete@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>> cheers for your response. I have the heap dumps, you can download
>>>>>>> them here (note they are fairly large, tarballs of up to 2GB
each, untarred
>>>>>>> up to 12GB)  :
>>>>>>>
>>>>>>> *Job Manager:*
>>>>>>>
>>>>>>> after the slowdown series:
>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapm-after.tar.gz
>>>>>>>
>>>>>>> *Task Manager 0:*
>>>>>>>
>>>>>>> after the slowdown series:
>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-after.tar.gz
>>>>>>> during the slowdown series:
>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-during.tar.gz
>>>>>>> during the slowdown series:
>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-during2.tar.gz
>>>>>>>
>>>>>>> *Task Manager 1:*
>>>>>>>
>>>>>>> after the slowdown series:
>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-after.tar.gz
>>>>>>> during the slowdown series:
>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-during.tar.gz
>>>>>>> during the slowdown series:
>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-during2.tar.gz
>>>>>>>
>>>>>>> I've also attempted to reproduce the issue in a local cluster
with a
>>>>>>> single 4CPU task manager, however no slowdown occurs so far.
>>>>>>>
>>>>>>> Please let me know if there is anything else I can provide you
with.
>>>>>>>
>>>>>>> - Pieter
>>>>>>>
>>>>>>> 2016-01-28 22:18 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> Thank you for looking into this.
>>>>>>>>
>>>>>>>> Such gradual slowdown is often caused by memory leaks, which
>>>>>>>> reduces available memory and makes GC more expensive.
>>>>>>>>
>>>>>>>> Can you create a heap dump after the series of runs that
cause the
>>>>>>>> slowdown? Then we can have a look whether there are leaked
Flink objects,
>>>>>>>> or if the user code is causing that.
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <phameete@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hi Till and Stephan,
>>>>>>>>>
>>>>>>>>> I've got some additional information. I just ran a query
60 times
>>>>>>>>> in a row, and all jobs ran in roughly the same time.
I did this for both a
>>>>>>>>> simple job that requires intensive parsing and a more
complex query that
>>>>>>>>> has several cogroups. Neither of the cases showed any
slowing down. I
>>>>>>>>> believe this is an indication that it is not a problem
with unreliable I/O
>>>>>>>>> or computational resources.
>>>>>>>>>
>>>>>>>>> However, when I now run the complete set of 15 different
queries
>>>>>>>>> again, with 3 repetitions each, things start slowing
down very quickly.
>>>>>>>>>
>>>>>>>>> The code I use to run the jobs can be found here:
>>>>>>>>> https://github.com/PHameete/dawn-flink/blob/feature/loading-algorithm-v2/src/main/scala/wis/dawnflink/performance/DawnBenchmarkSuite.scala
>>>>>>>>>
>>>>>>>>> I run this as follows in the GCloud: /flink run dawn-flink.jar
>>>>>>>>> gs://dawn-flink/data/split4G gs://dawn-flink/output/
3 result.csv
>>>>>>>>>
>>>>>>>>> Is there something I am doing incorrectly by running
these
>>>>>>>>> different queries in sequence like this? I have attempted
parametrizing
>>>>>>>>> further so I would call ./flink run for each separate
query but this did
>>>>>>>>> not make a difference.
>>>>>>>>>
>>>>>>>>> Cheers!
>>>>>>>>>
>>>>>>>>> - Pieter
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2016-01-28 11:28 GMT+01:00 Pieter Hameete <phameete@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi Till, Stephan,
>>>>>>>>>>
>>>>>>>>>> Indeed I think you're right it is just a coincidence
that the
>>>>>>>>>> three jobs started producing output as soon as the
first one finished.
>>>>>>>>>> Getting back to my comment that after the upgrade
to Flink 0.10.1 the job
>>>>>>>>>> was much faster I searched a bit further. I've now
noticed that after
>>>>>>>>>> setting up a fresh GCloud cluster the job runs roughly
15x faster than the
>>>>>>>>>> case I just showed you. Im executing many, many jobs
in sequence and it
>>>>>>>>>> seems as if performance is degrading a lot over time.
Let me tell you what
>>>>>>>>>> I'm doing exactly:
>>>>>>>>>>
>>>>>>>>>> I'm running a set of 16 queries in sequence on the
cluster. Each
>>>>>>>>>> query is executed multiple (5) times in a row so
I can average measurements
>>>>>>>>>> for a more reliable estimate of performance in the
GCloud. This means that
>>>>>>>>>> I'm running a total of 60 jobs in sequence, and on
5 different datasets,
>>>>>>>>>> totalling to 300 jobs. For each job I get the execution
environment, define
>>>>>>>>>> the query, write to an output file and run the env.execute.
After each job
>>>>>>>>>> I clear the output folder so the next job can start
fresh.
>>>>>>>>>>
>>>>>>>>>> Now I'm wondering what causes this performance degradation.
Could
>>>>>>>>>> this have to do with the GCloud (I surely hope they
don't scale back my
>>>>>>>>>> performance by 90% ;-) , do I need to some more cleaning
and resetting on
>>>>>>>>>> the Flink Cluster, or is my above approach asking
for trouble?
>>>>>>>>>>
>>>>>>>>>> Thanks again for your help!
>>>>>>>>>>
>>>>>>>>>> - Pieter
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2016-01-28 10:58 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
>>>>>>>>>>
>>>>>>>>>>> Hi Pieter,
>>>>>>>>>>>
>>>>>>>>>>> you can see in the log that the operators are
all started at the
>>>>>>>>>>> same time. However, you're right that they don't
finish at the same time.
>>>>>>>>>>> The sub tasks which run on the same node exhibit
a similar runtime.
>>>>>>>>>>> However, all nodes (not only hadoop-w-0 compared
to the others) show
>>>>>>>>>>> different runtimes. I would guess that this is
due to some other load on
>>>>>>>>>>> the GCloud machines or some other kind of asymmetry
between the hosts.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jan 28, 2016 at 10:17 AM, Pieter Hameete
<
>>>>>>>>>>> phameete@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Stephen, Till,
>>>>>>>>>>>>
>>>>>>>>>>>> I've watched the Job again and please see
the log of the
>>>>>>>>>>>> CoGroup operator:
>>>>>>>>>>>>
>>>>>>>>>>>> [image: Inline afbeelding 1]
>>>>>>>>>>>>
>>>>>>>>>>>> All workers get to process a fairly distributed
amount of bytes
>>>>>>>>>>>> and records, BUT hadoop-w-0, hadoop-w-2 and
hadoop-w-3 don't start working
>>>>>>>>>>>> until hadoop-w-1 is finished. Is this behavior
to be expected with a
>>>>>>>>>>>> CoGroup or could there still be something
wrong in the distrubtion of the
>>>>>>>>>>>> data?
>>>>>>>>>>>>
>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Pieter
>>>>>>>>>>>>
>>>>>>>>>>>> 2016-01-27 21:48 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Pieter!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Interesting, but good :-)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I don't think we did much on the hash
functions since 0.9.1. I
>>>>>>>>>>>>> am a bit surprised that it made such
a difference. Well, as long as it
>>>>>>>>>>>>> improves with the newer version :-)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Greetings,
>>>>>>>>>>>>> Stephan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jan 27, 2016 at 9:42 PM, Pieter
Hameete <
>>>>>>>>>>>>> phameete@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> i've upgraded to Flink 0.10.1 and
ran the job again without
>>>>>>>>>>>>>> any changes to the code to see the
bytes input and output of the operators
>>>>>>>>>>>>>> and for the different workers.To
my surprise it is very well balanced
>>>>>>>>>>>>>> between all workers and because of
this the job completed much faster.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are there any changes/fixes between
Flink 0.9.1 and 0.10.1
>>>>>>>>>>>>>> that could cause this to be better
for me now?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Pieter
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2016-01-27 14:10 GMT+01:00 Pieter
Hameete <phameete@gmail.com
>>>>>>>>>>>>>> >:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers for the quick reply Till.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> That would be very useful information
to have! I'll upgrade
>>>>>>>>>>>>>>> my project to Flink 0.10.1 tongiht
and let you know if I can find out if
>>>>>>>>>>>>>>> theres a skew in the data :-)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - Pieter
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2016-01-27 13:49 GMT+01:00 Till
Rohrmann <
>>>>>>>>>>>>>>> trohrmann@apache.org>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Could it be that your data
is skewed? This could lead to
>>>>>>>>>>>>>>>> different loads on different
task managers.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> With the latest Flink version,
the web interface should
>>>>>>>>>>>>>>>> show you how many bytes each
operator has written and received. There you
>>>>>>>>>>>>>>>> could see if one operator
receives more elements than the others.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jan 27, 2016 at 1:35
PM, Pieter Hameete <
>>>>>>>>>>>>>>>> phameete@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi guys,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Currently I am running
a job in the GCloud in a
>>>>>>>>>>>>>>>>> configuration with 4
task managers that each have 4 CPUs (for a total
>>>>>>>>>>>>>>>>> parallelism of 16).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, I noticed my
job is running much slower than
>>>>>>>>>>>>>>>>> expected and after some
more investigation I found that one of the workers
>>>>>>>>>>>>>>>>> is doing a majority of
the work (its CPU load was at 100% while the others
>>>>>>>>>>>>>>>>> were almost idle).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My job execution plan
can be found here:
>>>>>>>>>>>>>>>>> http://i.imgur.com/fHKhVFf.png
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The input is split into
multiple files so loading the data
>>>>>>>>>>>>>>>>> is properly distributed
over the workers.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am wondering if you
can provide me with some tips on how
>>>>>>>>>>>>>>>>> to figure out what is
going wrong here:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    - Could this imbalance
in workload be the result of an
>>>>>>>>>>>>>>>>>    imbalance in the hash
paritioning?
>>>>>>>>>>>>>>>>>    - Is there a convenient
way to see how many elements
>>>>>>>>>>>>>>>>>       each worker gets
to process? Would it work to write the output of the
>>>>>>>>>>>>>>>>>       CoGroup to disk
because each worker writes to its own output file and
>>>>>>>>>>>>>>>>>       investigate the
differences?
>>>>>>>>>>>>>>>>>    - Is there something
strange about the execution plan
>>>>>>>>>>>>>>>>>    that could cause this?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks and kind regards,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Pieter
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message