flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: long runtime
Date Thu, 25 Sep 2014 09:32:14 GMT
Your program is doing quite a few repartitioning steps, where all data
comes from a single data source.
You could try two things:
- triple the DataSource and Map Function that go into the two Signature
FlatMaps and the two later CoGroups such that you have two source->map for
each FlatMap and another one for the two later CoGroups.
- check out if SemanticAnnotations can help you to prevent expensive
repartitionings and sortings for the cogroups (
http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).

Best, Fabian

2014-09-25 10:51 GMT+02:00 Fabian Hueske <fhueske@apache.org>:

> Hi,
>
> the plan shows all operator DOPs as 1.
> Did you create the plan locally or on the cluster with the correct DOP?
> The CLI client offers the -p parameter also for "info -e".
>
> BTW, you could try to set the DOP to the number of cores in your cluster.
> (But that doesn't explain why the job is so slow).
>
> 2014-09-25 10:01 GMT+02:00 Florian Hönicke <rockstarflo@gmail.com>:
>
>>  yes. I ran the massJoin on the cluster as well on 500MB.
>> I attached the execution plan.
>>
>> Greetings,
>> Florian
>>
>>
>> Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
>>
>>  OK, the log shows that the tasks are evenly distributed to all nodes.
>> I assume you run the program on the cluster as well on 500MB, right?
>>
>>  Can you please also post the execution plan for the cluster execution?
>> You get it with (See also:
>> http://flink.incubator.apache.org/docs/0.6-incubating/cli.html):
>> ./flink info -e jarfile.jar <parameters>
>>
>>  Thanks, Fabian
>>
>> 2014-09-25 0:21 GMT+02:00 Florian Hönicke <rockstarflo@gmail.com>:
>>
>>>  Thanks for your quick answer.
>>> In the following, I roughly sketch the mass-join algorithm.
>>> http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
>>> It's a R-S-Join which i modified to a self-join.
>>> Given a set of token sets. The massJoin finds all similar sets
>>> (regarding to the Jaccard Similarity(intersection/union))
>>> First, it calculates a global token grouping, i.e., each to token is
>>> grouped in one of 30 groups. Each group has almost the same token count.
>>> Than, it generates two types of signatures for each input set.
>>> If two sets are similar, they must share a common signature.
>>> In the next step, we find all candidate pairs (pairs which share a
>>> common signature).
>>> Some candidate pairs are filtered using the global token grouping.
>>> The remaining candidate pairs are verified to filter out all dissimilar
>>> pairs.
>>>
>>> @Fabian
>>> I specified the DOP via the command-line client as follows:
>>> /home/hoenicke/flink-0.6-incubating/bin/flink run -p 11
>>> /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \
>>> file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt
>>> file:///home/hoenicke/flink-0.6-incubating/output -v
>>>
>>> The log file is attached.
>>>
>>> Best, Florian
>>>
>>> Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
>>>
>>>  Hi,
>>>
>>>  how did you specify the degree of parallelism DOP for your program?
>>> Via the command-line client or system-configuration or otherwise?
>>>
>>>  The JobManager log file (./log/*jobManager*.log) contains you the DOP
>>> of each task.
>>>
>>>  Best, Fabian
>>>
>>> 2014-09-24 18:41 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>>>
>>>> Hi!
>>>>
>>>>  Ad-hoc, that is not easy to say. It depends on your algorithm, how
>>>> much data replication it does...
>>>>
>>>>  We'd need a bit of time to look into the code. It would help if you
>>>> could roughly sketch the algorithm for us and give us a breakdown of how
>>>> much time is spent in which operator (like a screenshot of the runtime web
>>>> monitor).
>>>>
>>>>  Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <rockstarflo@gmail.com
>>>> > wrote:
>>>>
>>>>> Hello :)
>>>>>
>>>>> my Flink program is extreme slow.
>>>>> I implemented a set similarity join in Flink (Mass-Join).
>>>>> Furthermore, I implemented a local version in Java.
>>>>> I compared both Implementations.
>>>>> The Local version needs one minute to compute a 500MB Dataset.
>>>>> My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
>>>>> I use the Flink version 0.6.
>>>>> What could be the cause?
>>>>>
>>>>> I would welcome your response,
>>>>> Florian Hönicke
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>

Mime
View raw message