Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0259A10D38 for ; Tue, 8 Sep 2015 09:22:24 +0000 (UTC) Received: (qmail 67045 invoked by uid 500); 8 Sep 2015 09:22:23 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 66963 invoked by uid 500); 8 Sep 2015 09:22:23 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 66951 invoked by uid 99); 8 Sep 2015 09:22:23 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Sep 2015 09:22:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 5D5D1C6645 for ; Tue, 8 Sep 2015 09:22:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.23 X-Spam-Level: *** X-Spam-Status: No, score=3.23 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.25, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 72sX54NOgj3j for ; Tue, 8 Sep 2015 09:22:17 +0000 (UTC) Received: from mail-yk0-f182.google.com (mail-yk0-f182.google.com [209.85.160.182]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 42CA620385 for ; Tue, 8 Sep 2015 09:22:17 +0000 (UTC) Received: by ykdg206 with SMTP id g206so109315279ykd.1 for ; Tue, 08 Sep 2015 02:22:09 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=pi0PhKGswgWQ+p1gvWdONED9NTLMGCLfG5uODrFRk/s=; b=O8O57/fwlY2Rs8tUZYBi57tGYh7HZa2ZrszHeZJbD3BjLRjtQTeqiD4VsQJrd54rkf I84GCDeGCnUGyd7a0tETFFot33hu+OEcuwQuZ2L8y7L7K/JOeEr96UnKN7OE9x3O7wWa Fl7cOr8Aj8GOdc8Gkm5tkwokaY5PpW8uNctmjR0V7cyDswfDvVlZNh7MylHaojqZt5bp RXwxp0dzHh14eTD3yidmCa+UvH/KUqt5VF/PZnnmH5L2AXVW/bbEJa4KKZFzpUV9WTOG qJAftDThd+mPV8VBk8k1Hy6WL9G3XGn/h1e1ZBFU3ReIb25IknlVJ5DopdLyxy4EgcDY GfPA== MIME-Version: 1.0 X-Received: by 10.129.49.3 with SMTP id x3mr26577977ywx.56.1441704129834; Tue, 08 Sep 2015 02:22:09 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.31.128.19 with HTTP; Tue, 8 Sep 2015 02:22:09 -0700 (PDT) In-Reply-To: References: Date: Tue, 8 Sep 2015 11:22:09 +0200 X-Google-Sender-Auth: q3RzIrxGGpCQsDp8w-LW35-42BQ Message-ID: Subject: Re: Case of possible join optimization From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11407dba87b6b3051f38e651 --001a11407dba87b6b3051f38e651 Content-Type: text/plain; charset=UTF-8 The problem is the "getInput2()" call. It takes the input to the join, not the result of the join. That way, the first join never happens. On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier wrote: > Obviously when trying to simplify my code I didn't substitute correctly > the variable of the join..it should be: > > DataSet, List>> atomSubset = > attrToExpand.join(*subset* > ).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); > > Do you think that a JoinHint to create a sort-merge join is equivalent to > my solution? > > > On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen wrote: > >> Hi Flavio! >> >> No, Flink does not join keys before full values. That is very often very >> inefficient, as it results effectively in two joins where one is typically >> about as expensive as the original join. >> >> One can do "semi-join-reduction", in case the join filters out many >> values (many elements from one side do not find a match in the other side). >> If the join does not filter, this does not help either. >> >> Your code is a bit of a surprise. Especially, because in you solution >> that worked, the first statement does nothing: >> >> DataSet>> subset = >> >> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); >> >> >> This builds a join, but then takes the second input of the join (the >> bigDataset data set). Because the result of the join is never >> actually used, it is never executed. The second statement hence is >> effectively >> >> DataSet, List>> atomSubset = >> >> attrToExpand.join(bigDataset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >> >> >> Curious why this executed when the original did not. >> >> BTW: If the Lists are very long so they do not fit into a hashtable >> memory partition, you can try to use a JoinHint to create a sort-merge >> join. It may become slower, but typically works with even less memory. >> >> >> Greetings, >> Stephan >> >> >> On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermaier >> wrote: >> >>> Hi to all, >>> >>> I have a case where I don't understand why flink is not able to optimize >>> the join between 2 datasets. >>> >>> My initial code was basically this: >>> >>> DataSet>> bigDataset = ...;//5.257.207 >>> elements >>> DataSet>> attrToExpand = >>> ...;//65.000 elements >>> >>> DataSet> tmp = >>> >>> attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >>> >>> This job wasn't able to complete on my local machine (from Eclipse) >>> because Flink was giving me the following error: >>> >>> Hash join exceeded maximum number of recursions, without reducing >>> partitions enough to be memory resident. Probably cause: Too many duplicate >>> keys. >>> >>> This was because in attrToExpand the List could be quite big. >>> Indeed, changing that code to the following make everything work like a >>> charm: >>> >>> DataSet>> subset = >>> >>> attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); >>> >>> DataSet, List>> atomSubset = >>> >>> attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); >>> >>> >>> Isn't something impossible for Flink to optimize my initial code into >>> the second? I was convinced that Flink was performing a join only on the >>> keys before grabbing also the other elements of the Tuples into memory..am >>> I wrong? >>> >>> Best, >>> Flavio >>> >> >> > > --001a11407dba87b6b3051f38e651 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
The problem is the "getInput2()" call. It takes = the input to the join, not the result of the join. That way, the first join= never happens.

On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier &= lt;pompermaier@ok= kam.it> wrote:
Obviously when trying to simplify my code I didn't substitute corr= ectly the variable of the join..it should be:

DataS= et<Tuple3<String, List<MyObject>, List<ThriftObj>>>= atomSubset =3D
=C2=A0 =C2=A0 =C2=A0 attrToExpand.join(subset).w= here(0).equalTo(0).projectFirst(0,1).projectSecond(1);

Do you think that a Join= Hint to create a sort-merge join is equivalent to my solution?
<= div class=3D"h5">

On Tue, Sep 8, 2015 at 10:45 AM, Stephan Ewen <sewen@apache.org>= wrote:
Hi Flavio!

=
No, Flink does not join keys before full values. That is very of= ten very inefficient, as it results effectively in two joins where one is t= ypically about as expensive as the original join.

= One can do "semi-join-reduction", in case the join filters out ma= ny values (many elements from one side do not find a match in the other sid= e). If the join does not filter, this does not help either.

<= /div>
Your code is a bit of a surprise. Especially, because in you solu= tion that worked, the first statement does nothing:

DataSet<Tuple2<String, List&l= t;ThriftObj>>> subset =3D
=C2= =A0 =C2=A0 =C2=A0 attrToExpand.project(0).joinWithHuge(bigDataset).where(0)= .equalTo(0).getInput2();


This builds a join, but then takes the second input of the join (t= he=C2=A0bigDataset=C2=A0data set). = Because the result of the join is never actually=C2=A0used, it is never exe= cuted. The second statement hence is effectively

DataSet<Tuple3<String, List<MyObject>, List<= ThriftObj>>> atomSubset =3D=C2=A0
=C2=A0 =C2=A0 =C2=A0 attrToExpand.join(bigDataset).where(0).equ= alTo(0).projectFirst(0,1).projectSecond(1);


Curious why this executed when the original did not.

BTW= : If the Lists are very long so they do not fit into a hashtable memory par= tition, you can try to use a JoinHint to create a sort-merge join. It may b= ecome slower, but typically works with even less memory.


Greetings,
Stephan


=
On Tue, Sep 8, 2015 at 9:59 AM, Flavio Pompermai= er <pompermaier@okkam.it> wrote:
Hi to all,

I have a case where I don't understand why flink is not able to op= timize the join between 2 datasets.

My initial cod= e was basically this:

DataSet<Tuple2<String,= List<ThriftObj>>> bigDataset =3D ...;//5.257.207 elements
<= /div>
DataSet<Tuple2<String,List<MyObject>>> attrToEx= pand =3D ...;//65.000=C2=A0elements

DataSet<= ;Tuple2<String, IndexAttributeToExpand>> tmp =3D=C2=A0
attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).= projectSecond(1);

This job wasn't able to = complete on my local machine (from Eclipse) because Flink was giving me the= following error:

Hash join exceeded maximum = number of recursions, without reducing partitions enough to be memory resid= ent. Probably cause: Too many duplicate keys.

This was because in attrToExpand the List<MyObject> could be quite = big. Indeed, changing that code to the following make everything work like = a charm:

DataSet<Tuple2<String, List<= ;ThriftObj>>> subset =3D
=C2=A0 =C2=A0 =C2=A0 attrToExpa= nd.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();

DataSet<Tuple3<String, List<MyObject>, Li= st<ThriftObj>>> atomSubset =3D=C2=A0
=C2=A0 =C2=A0 = =C2=A0 attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).proj= ectSecond(1);


Isn't somet= hing impossible for Flink to optimize my initial code into the second? I wa= s convinced that Flink was performing a join only on the keys before grabbi= ng also the other elements of the Tuples into memory..am I wrong?

Best,
Flavio



=


--001a11407dba87b6b3051f38e651--