flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Florian Hönicke" <rockstar...@gmail.com>
Subject Re: Re: long runtime
Date Thu, 02 Oct 2014 17:59:13 GMT
The code is attached.
Input format:
<SetID=1, token_1, token_7, token_11, token_20...token_i>
<SetID=2, token_2, token_4...token_j>
....
In the file it looks like:
1 1,7,11,20
2 2,4
We assume that all tokens (token_1...token_n) are sorted by their global 
token frequency.
Token_1 is the least frequent token and token_n is the most frequent token.

Greetings Florian

-------- Original-Nachricht --------
Betreff: 	Re: long runtime
Datum: 	Thu, 2 Oct 2014 19:42:58 +0200
Von: 	Flavio Pompermaier <pompermaier@okkam.it>
Antwort an: 	user@flink.incubator.apache.org
An: 	user@flink.incubator.apache.org


Could you share the code?it sounds interesting to try!

On Oct 2, 2014 7:31 PM, "Florian Hönicke" <rockstarflo@gmail.com 
<mailto:rockstarflo@gmail.com>> wrote:

    Thanks a lot :)
    I set some semantic annotations.
    Now it needs 2 minutes.
    Edit: the triple DataSource does not have an influence.

    Am 25.09.2014 11:32, schrieb Fabian Hueske:
>     Your program is doing quite a few repartitioning steps, where all
>     data comes from a single data source.
>     You could try two things:
>     - triple the DataSource and Map Function that go into the two
>     Signature FlatMaps and the two later CoGroups such that you
>     have two source->map for each FlatMap and another one for the two
>     later CoGroups.
>     - check out if SemanticAnnotations can help you to prevent
>     expensive repartitionings and sortings for the cogroups
>     (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).
>
>     Best, Fabian
>
>     2014-09-25 10:51 GMT+02:00 Fabian Hueske <fhueske@apache.org
>     <mailto:fhueske@apache.org>>:
>
>         Hi,
>
>         the plan shows all operator DOPs as 1.
>         Did you create the plan locally or on the cluster with the
>         correct DOP? The CLI client offers the -p parameter also for
>         "info -e".
>
>         BTW, you could try to set the DOP to the number of cores in
>         your cluster. (But that doesn't explain why the job is so slow).
>
>         2014-09-25 10:01 GMT+02:00 Florian Hönicke
>         <rockstarflo@gmail.com <mailto:rockstarflo@gmail.com>>:
>
>             yes. I ran the massJoin on the cluster as well on 500MB.
>             I attached the execution plan.
>
>             Greetings,
>             Florian
>
>
>             Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
>>             OK, the log shows that the tasks are evenly distributed
>>             to all nodes.
>>             I assume you run the program on the cluster as well on
>>             500MB, right?
>>
>>             Can you please also post the execution plan for the
>>             cluster execution?
>>             You get it with (See also:
>>             http://flink.incubator.apache.org/docs/0.6-incubating/cli.html):
>>             ./flink info -e jarfile.jar <parameters>
>>
>>             Thanks, Fabian
>>
>>             2014-09-25 0:21 GMT+02:00 Florian Hönicke
>>             <rockstarflo@gmail.com <mailto:rockstarflo@gmail.com>>:
>>
>>                 Thanks for your quick answer.
>>                 In the following, I roughly sketch the mass-join
>>                 algorithm.
>>                 http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
>>                 <http://www.cs.berkeley.edu/%7Ejnwang/papers/icde14_massjoin.pdf>
>>
>>                 It's a R-S-Join which i modified to a self-join.
>>                 Given a set of token sets. The massJoin finds all
>>                 similar sets (regarding to the Jaccard
>>                 Similarity(intersection/union))
>>                 First, it calculates a global token grouping, i.e.,
>>                 each to token is grouped in one of 30 groups. Each
>>                 group has almost the same token count.
>>                 Than, it generates two types of signatures for each
>>                 input set.
>>                 If two sets are similar, they must share a common
>>                 signature.
>>                 In the next step, we find all candidate pairs (pairs
>>                 which share a common signature).
>>                 Some candidate pairs are filtered using the global
>>                 token grouping.
>>                 The remaining candidate pairs are verified to filter
>>                 out all dissimilar pairs.
>>
>>                 @Fabian
>>                 I specified the DOP via the command-line client as
>>                 follows:
>>                 /home/hoenicke/flink-0.6-incubating/bin/flink run -p
>>                 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar
>>                 0.9
>>                 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt
>>                 file:///home/hoenicke/flink-0.6-incubating/output -v
>>
>>                 The log file is attached.
>>
>>                 Best, Florian
>>
>>                 Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
>>>                 Hi,
>>>
>>>                 how did you specify the degree of parallelism
>>>                 DOP for your program?
>>>                 Via the command-line client or system-configuration
>>>                 or otherwise?
>>>
>>>                 The JobManager log file (./log/*jobManager*.log)
>>>                 contains you the DOP of each task.
>>>
>>>                 Best, Fabian
>>>
>>>                 2014-09-24 18:41 GMT+02:00 Stephan Ewen
>>>                 <sewen@apache.org <mailto:sewen@apache.org>>:
>>>
>>>                     Hi!
>>>
>>>                     Ad-hoc, that is not easy to say. It depends on
>>>                     your algorithm, how much data replication it does...
>>>
>>>                     We'd need a bit of time to look into the code.
>>>                     It would help if you could roughly sketch the
>>>                     algorithm for us and give us a breakdown of how
>>>                     much time is spent in which operator (like a
>>>                     screenshot of the runtime web monitor).
>>>
>>>                     Greetings,
>>>                     Stephan
>>>
>>>
>>>                     On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke
>>>                     <rockstarflo@gmail.com
>>>                     <mailto:rockstarflo@gmail.com>> wrote:
>>>
>>>                         Hello :)
>>>
>>>                         my Flink program is extreme slow.
>>>                         I implemented a set similarity join in Flink
>>>                         (Mass-Join).
>>>                         Furthermore, I implemented a local version
>>>                         in Java.
>>>                         I compared both Implementations.
>>>                         The Local version needs one minute to
>>>                         compute a 500MB Dataset.
>>>                         My Flink program needs 5 minutes (cluster:
>>>                         11 nodes, 20 000 MB RAM).
>>>                         I use the Flink version 0.6.
>>>                         What could be the cause?
>>>
>>>                         I would welcome your response,
>>>                         Florian Hönicke
>>>
>>>
>>>
>>
>>
>
>
>




Mime
View raw message