Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8D23A1106E for ; Thu, 25 Sep 2014 08:52:16 +0000 (UTC) Received: (qmail 15641 invoked by uid 500); 25 Sep 2014 08:52:16 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 15586 invoked by uid 500); 25 Sep 2014 08:52:16 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 15577 invoked by uid 99); 25 Sep 2014 08:52:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Sep 2014 08:52:16 +0000 X-ASF-Spam-Status: No, hits=-1998.6 required=5.0 tests=ALL_TRUSTED,HTML_MESSAGE,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 25 Sep 2014 08:52:14 +0000 Received: (qmail 14140 invoked by uid 99); 25 Sep 2014 08:51:54 -0000 Received: from minotaur.apache.org (HELO minotaur.apache.org) (140.211.11.9) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Sep 2014 08:51:54 +0000 Received: from localhost (HELO mail-yk0-f176.google.com) (127.0.0.1) (smtp-auth username fhueske, mechanism plain) by minotaur.apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Sep 2014 08:51:54 +0000 Received: by mail-yk0-f176.google.com with SMTP id 19so3022125ykq.21 for ; Thu, 25 Sep 2014 01:51:53 -0700 (PDT) MIME-Version: 1.0 X-Received: by 10.236.61.100 with SMTP id v64mr14164346yhc.77.1411635113205; Thu, 25 Sep 2014 01:51:53 -0700 (PDT) Received: by 10.170.145.136 with HTTP; Thu, 25 Sep 2014 01:51:53 -0700 (PDT) In-Reply-To: <5423CBF1.2050705@gmail.com> References: <5422EEB9.4090803@googlemail.com> <542343CD.4080403@gmail.com> <5423CBF1.2050705@gmail.com> Date: Thu, 25 Sep 2014 10:51:53 +0200 Message-ID: Subject: Re: long runtime From: Fabian Hueske To: user@flink.incubator.apache.org Content-Type: multipart/alternative; boundary=089e0158be1c79d1d90503dfe9a5 X-Virus-Checked: Checked by ClamAV on apache.org --089e0158be1c79d1d90503dfe9a5 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, the plan shows all operator DOPs as 1. Did you create the plan locally or on the cluster with the correct DOP? The CLI client offers the -p parameter also for "info -e". BTW, you could try to set the DOP to the number of cores in your cluster. (But that doesn't explain why the job is so slow). 2014-09-25 10:01 GMT+02:00 Florian H=C3=B6nicke : > yes. I ran the massJoin on the cluster as well on 500MB. > I attached the execution plan. > > Greetings, > Florian > > > Am 25.09.2014 um 00:41 schrieb Fabian Hueske: > > OK, the log shows that the tasks are evenly distributed to all nodes. > I assume you run the program on the cluster as well on 500MB, right? > > Can you please also post the execution plan for the cluster execution? > You get it with (See also: > http://flink.incubator.apache.org/docs/0.6-incubating/cli.html): > ./flink info -e jarfile.jar > > Thanks, Fabian > > 2014-09-25 0:21 GMT+02:00 Florian H=C3=B6nicke : > >> Thanks for your quick answer. >> In the following, I roughly sketch the mass-join algorithm. >> http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf >> It's a R-S-Join which i modified to a self-join. >> Given a set of token sets. The massJoin finds all similar sets (regardin= g >> to the Jaccard Similarity(intersection/union)) >> First, it calculates a global token grouping, i.e., each to token is >> grouped in one of 30 groups. Each group has almost the same token count. >> Than, it generates two types of signatures for each input set. >> If two sets are similar, they must share a common signature. >> In the next step, we find all candidate pairs (pairs which share a commo= n >> signature). >> Some candidate pairs are filtered using the global token grouping. >> The remaining candidate pairs are verified to filter out all dissimilar >> pairs. >> >> @Fabian >> I specified the DOP via the command-line client as follows: >> /home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 >> /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \ >> file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt >> file:///home/hoenicke/flink-0.6-incubating/output -v >> >> The log file is attached. >> >> Best, Florian >> >> Am 24.09.2014 um 22:45 schrieb Fabian Hueske: >> >> Hi, >> >> how did you specify the degree of parallelism DOP for your program? >> Via the command-line client or system-configuration or otherwise? >> >> The JobManager log file (./log/*jobManager*.log) contains you the DOP >> of each task. >> >> Best, Fabian >> >> 2014-09-24 18:41 GMT+02:00 Stephan Ewen : >> >>> Hi! >>> >>> Ad-hoc, that is not easy to say. It depends on your algorithm, how >>> much data replication it does... >>> >>> We'd need a bit of time to look into the code. It would help if you >>> could roughly sketch the algorithm for us and give us a breakdown of ho= w >>> much time is spent in which operator (like a screenshot of the runtime = web >>> monitor). >>> >>> Greetings, >>> Stephan >>> >>> >>> On Wed, Sep 24, 2014 at 6:18 PM, Florian H=C3=B6nicke >>> wrote: >>> >>>> Hello :) >>>> >>>> my Flink program is extreme slow. >>>> I implemented a set similarity join in Flink (Mass-Join). >>>> Furthermore, I implemented a local version in Java. >>>> I compared both Implementations. >>>> The Local version needs one minute to compute a 500MB Dataset. >>>> My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM). >>>> I use the Flink version 0.6. >>>> What could be the cause? >>>> >>>> I would welcome your response, >>>> Florian H=C3=B6nicke >>>> >>> >>> >> >> > > --089e0158be1c79d1d90503dfe9a5 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

the plan shows all opera= tor DOPs as 1.
Did you create the plan locally or on the cluster= with the correct DOP? The CLI client offers the -p parameter also for &quo= t;info -e".

BTW, you could try to set the DOP= to the number of cores in your cluster. (But that doesn't explain why = the job is so slow).

2014-09-25 10:01 GMT+02:00 Florian H=C3=B6nicke <roc= kstarflo@gmail.com>:
=20 =20 =20
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan.

Greetings,
Florian


Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
OK,=C2=A0the log shows=C2=A0that the tasks are evenly distribu= ted to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian H=C3=B6nicke <rockstarflo@gmail.com>:
Thanks for your quic= k answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/= icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \fil= e:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP=C2=A0for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <sewen@apache.org>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian H=C3=B6nicke <rockstarflo@gmail.com> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian H=C3=B6nicke






--089e0158be1c79d1d90503dfe9a5--