flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Florian Hönicke" <rockstar...@gmail.com>
Subject Re: long runtime
Date Thu, 02 Oct 2014 17:29:58 GMT
Thanks a lot :)
I set some semantic annotations.
Now it needs 2 minutes.

Am 25.09.2014 11:32, schrieb Fabian Hueske:
> Your program is doing quite a few repartitioning steps, where all data 
> comes from a single data source.
> You could try two things:
> - triple the DataSource and Map Function that go into the two 
> Signature FlatMaps and the two later CoGroups such that you have two 
> source->map for each FlatMap and another one for the two later CoGroups.
> - check out if SemanticAnnotations can help you to prevent expensive 
> repartitionings and sortings for the cogroups 
> (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).
>
> Best, Fabian
>
> 2014-09-25 10:51 GMT+02:00 Fabian Hueske <fhueske@apache.org 
> <mailto:fhueske@apache.org>>:
>
>     Hi,
>
>     the plan shows all operator DOPs as 1.
>     Did you create the plan locally or on the cluster with the correct
>     DOP? The CLI client offers the -p parameter also for "info -e".
>
>     BTW, you could try to set the DOP to the number of cores in your
>     cluster. (But that doesn't explain why the job is so slow).
>
>     2014-09-25 10:01 GMT+02:00 Florian Hönicke <rockstarflo@gmail.com
>     <mailto:rockstarflo@gmail.com>>:
>
>         yes. I ran the massJoin on the cluster as well on 500MB.
>         I attached the execution plan.
>
>         Greetings,
>         Florian
>
>
>         Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
>>         OK, the log shows that the tasks are evenly distributed to
>>         all nodes.
>>         I assume you run the program on the cluster as well on 500MB,
>>         right?
>>
>>         Can you please also post the execution plan for the cluster
>>         execution?
>>         You get it with (See also:
>>         http://flink.incubator.apache.org/docs/0.6-incubating/cli.html):
>>         ./flink info -e jarfile.jar <parameters>
>>
>>         Thanks, Fabian
>>
>>         2014-09-25 0:21 GMT+02:00 Florian Hönicke
>>         <rockstarflo@gmail.com <mailto:rockstarflo@gmail.com>>:
>>
>>             Thanks for your quick answer.
>>             In the following, I roughly sketch the mass-join algorithm.
>>             http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
>>             <http://www.cs.berkeley.edu/%7Ejnwang/papers/icde14_massjoin.pdf>
>>
>>             It's a R-S-Join which i modified to a self-join.
>>             Given a set of token sets. The massJoin finds all similar
>>             sets (regarding to the Jaccard
>>             Similarity(intersection/union))
>>             First, it calculates a global token grouping, i.e., each
>>             to token is grouped in one of 30 groups. Each group has
>>             almost the same token count.
>>             Than, it generates two types of signatures for each input
>>             set.
>>             If two sets are similar, they must share a common signature.
>>             In the next step, we find all candidate pairs (pairs
>>             which share a common signature).
>>             Some candidate pairs are filtered using the global token
>>             grouping.
>>             The remaining candidate pairs are verified to filter out
>>             all dissimilar pairs.
>>
>>             @Fabian
>>             I specified the DOP via the command-line client as follows:
>>             /home/hoenicke/flink-0.6-incubating/bin/flink run -p 11
>>             /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9
>>             \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt
>>             file:///home/hoenicke/flink-0.6-incubating/output -v
>>
>>             The log file is attached.
>>
>>             Best, Florian
>>
>>             Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
>>>             Hi,
>>>
>>>             how did you specify the degree of parallelism DOP for
>>>             your program?
>>>             Via the command-line client or system-configuration or
>>>             otherwise?
>>>
>>>             The JobManager log file (./log/*jobManager*.log)
>>>             contains you the DOP of each task.
>>>
>>>             Best, Fabian
>>>
>>>             2014-09-24 18:41 GMT+02:00 Stephan Ewen
>>>             <sewen@apache.org <mailto:sewen@apache.org>>:
>>>
>>>                 Hi!
>>>
>>>                 Ad-hoc, that is not easy to say. It depends on your
>>>                 algorithm, how much data replication it does...
>>>
>>>                 We'd need a bit of time to look into the code. It
>>>                 would help if you could roughly sketch the algorithm
>>>                 for us and give us a breakdown of how much time is
>>>                 spent in which operator (like a screenshot of the
>>>                 runtime web monitor).
>>>
>>>                 Greetings,
>>>                 Stephan
>>>
>>>
>>>                 On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke
>>>                 <rockstarflo@gmail.com
>>>                 <mailto:rockstarflo@gmail.com>> wrote:
>>>
>>>                     Hello :)
>>>
>>>                     my Flink program is extreme slow.
>>>                     I implemented a set similarity join in Flink
>>>                     (Mass-Join).
>>>                     Furthermore, I implemented a local version in Java.
>>>                     I compared both Implementations.
>>>                     The Local version needs one minute to compute a
>>>                     500MB Dataset.
>>>                     My Flink program needs 5 minutes (cluster: 11
>>>                     nodes, 20 000 MB RAM).
>>>                     I use the Flink version 0.6.
>>>                     What could be the cause?
>>>
>>>                     I would welcome your response,
>>>                     Florian Hönicke
>>>
>>>
>>>
>>
>>
>
>
>


Mime
View raw message