flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pieter Hameete <phame...@gmail.com>
Subject Re: Imbalanced workload between workers
Date Sun, 07 Feb 2016 13:48:03 GMT
Hi Ufuk,

You can find the job jar here:
https://storage.googleapis.com/dawn-flink/dawn-flink-improved.jar

I've used the Google Cloud setup with bdutil as described on
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/gce_setup.html
but with a hadoop2 env, so deploying the cluster as follows:

./bdutil -e  hadoop2_env.sh -e extensions/flink/flink_env.sh deploy

You should be able to reproduce with a JobManager and 2 TaskManagers. I
used n1-standard-4 instances for both. Flink version used is 0.10.1.

I ran the job as follows on 4 different size datasets in sequence as
follows:

./flink run dawn-flink-improved.jar gs://dawn-flink/data/split8G
<OUTFOLDER> 5 improved-8G.csv &&
./flink run dawn-flink-improved.jar gs://dawn-flink/data/split4G
<OUTFOLDER> 5 improved-4G.csv &&
./flink run dawn-flink-improved.jar gs://dawn-flink/data/split2G
<OUTFOLDER> 5 improved-2G.csv &&
./flink run dawn-flink-improved.jar gs://dawn-flink/data/split1G
<OUTFOLDER> 5 improved-1G.csv

The datasets should be publicly available now, they are in the
europe-west-1c region.

Note that each of these runs 80 jobs (16 different queries, and 5
repetitions each). You should see slowdown happening for the second or
third batch of jobs. Even though the dataset size is halved, for me it
started tooking much more time to complete the batch of jobs.

Can you work with this info? If there is anything else you need, please let
me know!

- Pieter

2016-02-07 9:20 GMT+01:00 Ufuk Celebi <uce@apache.org>:

> Hey Pieter! Thanks for investigating this. If you can share the job and
> setup details (instance types, Hadoop, etc.) I can also run it. :-)
>
> – Ufuk
>
>
> On Saturday, February 6, 2016, Pieter Hameete <phameete@gmail.com> wrote:
>
>> Hi Stephan,
>>
>> Thank you for your time looking into this. I might be able to check those
>> things later if you believe it would be interesting to find the source of
>> this problem.
>>
>> For me the Google Cloud is getting too expensive to use right now.
>> Instead I found a YARN cluster that I can use for free and I'm hoping that
>> getting fresh containers for each job will be a good workaround until then.
>>
>> Cheers,
>>
>> - Pieter
>>
>>
>>
>> 2016-02-06 15:12 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>
>>> Hi!
>>>
>>> Then the heap dump looks actually non-suspicious. The 8.x GB of managed
>>> memory corresponds to what is expected.
>>>
>>> If there is no memory leak, what could be the reason?
>>>
>>>   - Thread leaks? You can try and create thread dumps (via jstack) after
>>> the slowdown (between invocations of the program).
>>>
>>>   - Is the JITer going into weird behavior, in the end not compiling as
>>> aggressively? Maybe classes do not get properly unloaded, or the code cache
>>> (where JITted assembler code is stored) flows over.
>>>
>>> Can you check at the beginning of the log files, what garbage collectors
>>> are set? Is classunloading is actvated?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Fri, Feb 5, 2016 at 6:14 PM, Pieter Hameete <phameete@gmail.com>
>>> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>> cheers for looking at this :-) The TaskManagers have a 12GB of
>>>> available heapspace configured using taskmanager.heap.mb. The
>>>> configurations that you mention should be default.
>>>>
>>>> - Pieter
>>>>
>>>>
>>>>
>>>> 2016-02-05 18:10 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>
>>>>> EDIT: The by[] make up for 98% of the memory. That seems correct. 80%
>>>>> would mean something else is consuming a lot, which should not be the
case.
>>>>>
>>>>> On Fri, Feb 5, 2016 at 6:09 PM, Stephan Ewen <sewen@apache.org>
wrote:
>>>>>
>>>>>> You are using Flink 0.10, right?
>>>>>>
>>>>>> The byte[] are simply Flink's reserved managed memory. It is expected
>>>>>> that they occupy a large portion of the memory. (assuming you run
0.10 is
>>>>>> BATCH mode).
>>>>>> These byte[] should also not get more or fewer, so they should not
>>>>>> cause degradation.
>>>>>>
>>>>>> I can also not spot anything suspicious (in the after slowdown
>>>>>> dumps). The fact that the byte[] make up for 80% of the memory means
that
>>>>>> nothing else can really be leaking a lot.
>>>>>>
>>>>>> Did you configure a specific amount of memory
>>>>>> (taskmanager.memory.size or taskmanager.memory.fraction) ?
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 3, 2016 at 10:04 AM, Pieter Hameete <phameete@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>> did you manage to take a look at the dumps? I have looked myself
but
>>>>>>> I could not find anything that would point towards a memory leak
in my user
>>>>>>> code. I have a parser that creates quite a lot of objects, but
these are
>>>>>>> not abundantly present in the heap dump. I did see that the majority
of the
>>>>>>> heap was consisting of byte objects (but this is normal?) as
well as Akka
>>>>>>> objects in another dump.
>>>>>>>
>>>>>>> I am not familiar enough with Flink to know what is normal to
see in
>>>>>>> the heap dumps. I am looking forward to hearing what you think
:-)
>>>>>>>
>>>>>>> - Pieter
>>>>>>>
>>>>>>> 2016-01-29 13:36 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>>
>>>>>>>> Super, thanks a lot.
>>>>>>>>
>>>>>>>> I think we need to find some time and look through this,
but this
>>>>>>>> should help a lot!
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jan 29, 2016 at 1:32 PM, Pieter Hameete <phameete@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hi Stephan,
>>>>>>>>>
>>>>>>>>> cheers for your response. I have the heap dumps, you
can download
>>>>>>>>> them here (note they are fairly large, tarballs of up
to 2GB each, untarred
>>>>>>>>> up to 12GB)  :
>>>>>>>>>
>>>>>>>>> *Job Manager:*
>>>>>>>>>
>>>>>>>>> after the slowdown series:
>>>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapm-after.tar.gz
>>>>>>>>>
>>>>>>>>> *Task Manager 0:*
>>>>>>>>>
>>>>>>>>> after the slowdown series:
>>>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-after.tar.gz
>>>>>>>>> during the slowdown series:
>>>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-during.tar.gz
>>>>>>>>> during the slowdown series:
>>>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw0-during2.tar.gz
>>>>>>>>>
>>>>>>>>> *Task Manager 1:*
>>>>>>>>>
>>>>>>>>> after the slowdown series:
>>>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-after.tar.gz
>>>>>>>>> during the slowdown series:
>>>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-during.tar.gz
>>>>>>>>> during the slowdown series:
>>>>>>>>> https://storage.googleapis.com/dawn-flink/heapdumps/heapw1-during2.tar.gz
>>>>>>>>>
>>>>>>>>> I've also attempted to reproduce the issue in a local
cluster with
>>>>>>>>> a single 4CPU task manager, however no slowdown occurs
so far.
>>>>>>>>>
>>>>>>>>> Please let me know if there is anything else I can provide
you
>>>>>>>>> with.
>>>>>>>>>
>>>>>>>>> - Pieter
>>>>>>>>>
>>>>>>>>> 2016-01-28 22:18 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>>>>
>>>>>>>>>> Hi!
>>>>>>>>>>
>>>>>>>>>> Thank you for looking into this.
>>>>>>>>>>
>>>>>>>>>> Such gradual slowdown is often caused by memory leaks,
which
>>>>>>>>>> reduces available memory and makes GC more expensive.
>>>>>>>>>>
>>>>>>>>>> Can you create a heap dump after the series of runs
that cause
>>>>>>>>>> the slowdown? Then we can have a look whether there
are leaked Flink
>>>>>>>>>> objects, or if the user code is causing that.
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jan 28, 2016 at 5:58 PM, Pieter Hameete <
>>>>>>>>>> phameete@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Till and Stephan,
>>>>>>>>>>>
>>>>>>>>>>> I've got some additional information. I just
ran a query 60
>>>>>>>>>>> times in a row, and all jobs ran in roughly the
same time. I did this for
>>>>>>>>>>> both a simple job that requires intensive parsing
and a more complex query
>>>>>>>>>>> that has several cogroups. Neither of the cases
showed any slowing down. I
>>>>>>>>>>> believe this is an indication that it is not
a problem with unreliable I/O
>>>>>>>>>>> or computational resources.
>>>>>>>>>>>
>>>>>>>>>>> However, when I now run the complete set of 15
different queries
>>>>>>>>>>> again, with 3 repetitions each, things start
slowing down very quickly.
>>>>>>>>>>>
>>>>>>>>>>> The code I use to run the jobs can be found here:
>>>>>>>>>>> https://github.com/PHameete/dawn-flink/blob/feature/loading-algorithm-v2/src/main/scala/wis/dawnflink/performance/DawnBenchmarkSuite.scala
>>>>>>>>>>>
>>>>>>>>>>> I run this as follows in the GCloud: /flink run
dawn-flink.jar
>>>>>>>>>>> gs://dawn-flink/data/split4G gs://dawn-flink/output/
3 result.csv
>>>>>>>>>>>
>>>>>>>>>>> Is there something I am doing incorrectly by
running these
>>>>>>>>>>> different queries in sequence like this? I have
attempted parametrizing
>>>>>>>>>>> further so I would call ./flink run for each
separate query but this did
>>>>>>>>>>> not make a difference.
>>>>>>>>>>>
>>>>>>>>>>> Cheers!
>>>>>>>>>>>
>>>>>>>>>>> - Pieter
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2016-01-28 11:28 GMT+01:00 Pieter Hameete <phameete@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Till, Stephan,
>>>>>>>>>>>>
>>>>>>>>>>>> Indeed I think you're right it is just a
coincidence that the
>>>>>>>>>>>> three jobs started producing output as soon
as the first one finished.
>>>>>>>>>>>> Getting back to my comment that after the
upgrade to Flink 0.10.1 the job
>>>>>>>>>>>> was much faster I searched a bit further.
I've now noticed that after
>>>>>>>>>>>> setting up a fresh GCloud cluster the job
runs roughly 15x faster than the
>>>>>>>>>>>> case I just showed you. Im executing many,
many jobs in sequence and it
>>>>>>>>>>>> seems as if performance is degrading a lot
over time. Let me tell you what
>>>>>>>>>>>> I'm doing exactly:
>>>>>>>>>>>>
>>>>>>>>>>>> I'm running a set of 16 queries in sequence
on the cluster.
>>>>>>>>>>>> Each query is executed multiple (5) times
in a row so I can average
>>>>>>>>>>>> measurements for a more reliable estimate
of performance in the GCloud.
>>>>>>>>>>>> This means that I'm running a total of 60
jobs in sequence, and on 5
>>>>>>>>>>>> different datasets, totalling to 300 jobs.
For each job I get the execution
>>>>>>>>>>>> environment, define the query, write to an
output file and run the
>>>>>>>>>>>> env.execute. After each job I clear the output
folder so the next job can
>>>>>>>>>>>> start fresh.
>>>>>>>>>>>>
>>>>>>>>>>>> Now I'm wondering what causes this performance
degradation.
>>>>>>>>>>>> Could this have to do with the GCloud (I
surely hope they don't scale back
>>>>>>>>>>>> my performance by 90% ;-) , do I need to
some more cleaning and resetting
>>>>>>>>>>>> on the Flink Cluster, or is my above approach
asking for trouble?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks again for your help!
>>>>>>>>>>>>
>>>>>>>>>>>> - Pieter
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2016-01-28 10:58 GMT+01:00 Till Rohrmann
<trohrmann@apache.org>
>>>>>>>>>>>> :
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Pieter,
>>>>>>>>>>>>>
>>>>>>>>>>>>> you can see in the log that the operators
are all started at
>>>>>>>>>>>>> the same time. However, you're right
that they don't finish at the same
>>>>>>>>>>>>> time. The sub tasks which run on the
same node exhibit a similar runtime.
>>>>>>>>>>>>> However, all nodes (not only hadoop-w-0
compared to the others) show
>>>>>>>>>>>>> different runtimes. I would guess that
this is due to some other load on
>>>>>>>>>>>>> the GCloud machines or some other kind
of asymmetry between the hosts.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jan 28, 2016 at 10:17 AM, Pieter
Hameete <
>>>>>>>>>>>>> phameete@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Stephen, Till,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've watched the Job again and please
see the log of the
>>>>>>>>>>>>>> CoGroup operator:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [image: Inline afbeelding 1]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> All workers get to process a fairly
distributed amount of
>>>>>>>>>>>>>> bytes and records, BUT hadoop-w-0,
hadoop-w-2 and hadoop-w-3 don't start
>>>>>>>>>>>>>> working until hadoop-w-1 is finished.
Is this behavior to be expected with
>>>>>>>>>>>>>> a CoGroup or could there still be
something wrong in the distrubtion of the
>>>>>>>>>>>>>> data?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Pieter
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2016-01-27 21:48 GMT+01:00 Stephan
Ewen <sewen@apache.org>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Pieter!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Interesting, but good :-)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I don't think we did much on
the hash functions since 0.9.1.
>>>>>>>>>>>>>>> I am a bit surprised that it
made such a difference. Well, as long as it
>>>>>>>>>>>>>>> improves with the newer version
:-)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Greetings,
>>>>>>>>>>>>>>> Stephan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jan 27, 2016 at 9:42
PM, Pieter Hameete <
>>>>>>>>>>>>>>> phameete@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> i've upgraded to Flink 0.10.1
and ran the job again without
>>>>>>>>>>>>>>>> any changes to the code to
see the bytes input and output of the operators
>>>>>>>>>>>>>>>> and for the different workers.To
my surprise it is very well balanced
>>>>>>>>>>>>>>>> between all workers and because
of this the job completed much faster.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Are there any changes/fixes
between Flink 0.9.1 and 0.10.1
>>>>>>>>>>>>>>>> that could cause this to
be better for me now?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Pieter
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2016-01-27 14:10 GMT+01:00
Pieter Hameete <
>>>>>>>>>>>>>>>> phameete@gmail.com>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers for the quick
reply Till.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> That would be very useful
information to have! I'll
>>>>>>>>>>>>>>>>> upgrade my project to
Flink 0.10.1 tongiht and let you know if I can find
>>>>>>>>>>>>>>>>> out if theres a skew
in the data :-)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - Pieter
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2016-01-27 13:49 GMT+01:00
Till Rohrmann <
>>>>>>>>>>>>>>>>> trohrmann@apache.org>:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Could it be that
your data is skewed? This could lead to
>>>>>>>>>>>>>>>>>> different loads on
different task managers.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> With the latest Flink
version, the web interface should
>>>>>>>>>>>>>>>>>> show you how many
bytes each operator has written and received. There you
>>>>>>>>>>>>>>>>>> could see if one
operator receives more elements than the others.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Jan 27, 2016
at 1:35 PM, Pieter Hameete <
>>>>>>>>>>>>>>>>>> phameete@gmail.com>
wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi guys,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Currently I am
running a job in the GCloud in a
>>>>>>>>>>>>>>>>>>> configuration
with 4 task managers that each have 4 CPUs (for a total
>>>>>>>>>>>>>>>>>>> parallelism of
16).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> However, I noticed
my job is running much slower than
>>>>>>>>>>>>>>>>>>> expected and
after some more investigation I found that one of the workers
>>>>>>>>>>>>>>>>>>> is doing a majority
of the work (its CPU load was at 100% while the others
>>>>>>>>>>>>>>>>>>> were almost idle).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> My job execution
plan can be found here:
>>>>>>>>>>>>>>>>>>> http://i.imgur.com/fHKhVFf.png
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The input is
split into multiple files so loading the
>>>>>>>>>>>>>>>>>>> data is properly
distributed over the workers.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am wondering
if you can provide me with some tips on
>>>>>>>>>>>>>>>>>>> how to figure
out what is going wrong here:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    - Could this
imbalance in workload be the result of
>>>>>>>>>>>>>>>>>>>    an imbalance
in the hash paritioning?
>>>>>>>>>>>>>>>>>>>    - Is there
a convenient way to see how many elements
>>>>>>>>>>>>>>>>>>>       each worker
gets to process? Would it work to write the output of the
>>>>>>>>>>>>>>>>>>>       CoGroup
to disk because each worker writes to its own output file and
>>>>>>>>>>>>>>>>>>>       investigate
the differences?
>>>>>>>>>>>>>>>>>>>    - Is there
something strange about the execution
>>>>>>>>>>>>>>>>>>>    plan that
could cause this?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks and kind
regards,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Pieter
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Mime
View raw message