Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 43AC81736B for ; Tue, 17 Mar 2015 17:00:49 +0000 (UTC) Received: (qmail 71905 invoked by uid 500); 17 Mar 2015 17:00:49 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 71844 invoked by uid 500); 17 Mar 2015 17:00:49 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 71833 invoked by uid 99); 17 Mar 2015 17:00:49 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Mar 2015 17:00:49 +0000 X-ASF-Spam-Status: No, hits=1.8 required=5.0 tests=HTML_FONT_FACE_BAD,HTML_MESSAGE,RCVD_IN_DNSWL_LOW X-Spam-Check-By: apache.org Received-SPF: error (nike.apache.org: local policy) Received: from [209.85.160.175] (HELO mail-yk0-f175.google.com) (209.85.160.175) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Mar 2015 17:00:23 +0000 Received: by ykfs63 with SMTP id s63so6265418ykf.2 for ; Tue, 17 Mar 2015 09:58:30 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=nl/gZy8KQA6hKu5pkeY9e/ahhk02BoffUA7H51hPFJ0=; b=A9mngwR+6YA1IkQV8S0JYfupvvp0KUzbWtWVtKBD55w4Ti59OuWjiXSrpY7MXIOVD0 UQ3FN+QhezoXxJ/g2tUJwGQEHOpvWl6k2doETV5JEBB/zx9U+6Sw+wXFWgwgHwru/Gf7 MeiDC9rv/vF5F7dzI3aWooVBJEXLWxputhCZyYeJeQh6tyJRKQAywqCGayl7j0IFTAE4 zpa4L0QHPwZBrikO9/HDhDBgFySL/pHzHKfEV7PK2se5iwZtkVS1rNsJ/zz4km3ZiwPB QKebT3dHuy6fAfvNuzgqna+jXH+fi/dGKK7Jp3ZgbLIdPaSoB74War+X6tVfSJZezSzB QbOQ== X-Gm-Message-State: ALoCoQn7Jg/IUV19HwcNsV+vmknl2Fo1LBQS9ieR3NgoXsZcgjDP+6FDW5IkfUVLZTJ3lb2syp2e X-Received: by 10.236.17.201 with SMTP id j49mr69410566yhj.18.1426611510589; Tue, 17 Mar 2015 09:58:30 -0700 (PDT) MIME-Version: 1.0 Received: by 10.170.222.87 with HTTP; Tue, 17 Mar 2015 09:58:10 -0700 (PDT) X-Originating-IP: [213.203.177.29] In-Reply-To: <54982163.3255c20a.2e35.ffffbc71@mx.google.com> References: <54982163.3255c20a.2e35.ffffbc71@mx.google.com> From: Flavio Pompermaier Date: Tue, 17 Mar 2015 17:58:10 +0100 Message-ID: Subject: Re: Union of multiple datasets vs Join To: user Content-Type: multipart/alternative; boundary=089e0168130c52543805117ee040 X-Virus-Checked: Checked by ClamAV on apache.org --089e0168130c52543805117ee040 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Fabian, I was trying to use the strategy you suggested with flink 0.8.1 but it seems that the union of the datasets cannot be created programmatically because the union operator gives a name to the generated dataset that is the name of the calling function so that only the first dataset is read. My code looks like: private static DataSet getSourceDs(ExecutionEnvironment env, fi= nal String outputGraph, List tableNames) { DataSet> ret =3D null; for (String tableName : tableNames) { DataSet> sourceDs =3D env.createInput(new MyTableInputFormat(tableName)) .... if(ret=3D=3Dnull) ret =3D sourceDs; else ret.union(sourceDs); } return ret; } Is this a bug or am I'm doing something wrong? Thanks in advance, Flavio On Mon, Dec 22, 2014 at 2:42 PM, wrote: > Union is just combining data from multiple sources into a single > dataset. > That=E2=80=99s it. No memory, no disk involved. > > In you case you have > > input1.union(input2).groupBy(1).reduce(=E2=80=A6) > > This will translate into: > > input1 -> repartition -> > read-both-inputs -> sort -> redu= ce > input2 -> repartition -> > > So, in your case not even additional network transfer is involved, becaus= e > both data sets would need to be partitioned for the reduce anyway. > > Note, union in Flink has SQL union-all semantics, i.e., there is > not removal of duplicates. > > Cheers, Fabian > > *From:* Flavio Pompermaier > *Sent:* =E2=80=8EMonday=E2=80=8E, =E2=80=8E22=E2=80=8E. =E2=80=8EDecember= =E2=80=8E, =E2=80=8E2014 =E2=80=8E14=E2=80=8E:=E2=80=8E32 > *To:* user@flink.incubator.apache.org > > Ok thanks Fabian. I'd like just to know the internals of the union of > multiple datasets (partitioning, distribution among server, memory/disk, > etc..). Do you have any ref to this? > > Thanks in advance, > Flavio > > On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske > wrote: > >> Follow the first approach. >> Joins are expensive, union comes for free. >> >> Best, Fabian >> >> 2014-12-22 11:47 GMT+01:00 Flavio Pompermaier : >> >>> Hi guys, >>> >>> In my use case I have multiple Datasets with the same structure (e.g. >>> Tuple3) and I want to produce an output Dataset containing all Tuple3 >>> grouped by the first field (0). >>> I can obtain the same results performing a union of all datasets and >>> then a group by (simplest implementation) or join all of them pairwise >>> (((A->B)->C)->D)..) or I don't know if there is any other solution. Whe= n >>> should I use the first or the second approach? Could you help me in >>> figuring out the internals of the two approaches? I always have some fe= ar >>> when using multiple joins when I don't know exactly their size.. >>> >>> Best, >>> Flavio >>> >> >> > --089e0168130c52543805117ee040 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Fabian,
I was trying to use the strategy you sugges= ted with flink 0.8.1 but it seems that the union of the datasets cannot be = created programmatically because the union operator gives a name to the gen= erated dataset that is the name of the calling function so that =C2=A0only = the first dataset is read. My code looks like:

private static DataSet= <Tuple6<...> getSourceDs(ExecutionEnvironment env,=C2=A0final String outputGraph, List<String> t= ableNames) {
= DataSet<Tuple6<...>> ret =3D null;
for (String tableName : tableNames) {
DataSet<Tupl= e6<...>> sourceDs =3D=C2=A0env.createInput(new MyTableInputFormat(= tableName))
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ....

if(ret=3D=3Dnull)
= ret =3D sourceDs;
else
= ret.union(sourceDs);<= /div>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return ret;
= =C2=A0 =C2=A0 =C2=A0 =C2=A0}

Is this a bug or am I= 'm doing something wrong?
Thanks in advance,
Flavio=

On Mon, Dec= 22, 2014 at 2:42 PM, <fhueske@gmail.com> wrote:
Union is just combining data from multiple sources into= a single dataset.
That=E2=80=99s it. N= o memory, no disk involved.

In you case you have

input1.union(input2).group= By(1).reduce(=E2=80=A6)

This will translate into:

input1 -> repartition = ->
=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 read-both-inputs ->=C2=A0 sort -> reduce
input2 -> repartition ->

S= o, in your case not even additional network transfer is involved, because b= oth data sets would need to be partitioned for the reduce anyway.

Note, uni= on in=C2=A0Flink=C2=A0has=C2=A0SQL union-all semantics, i.e., there is not= =C2=A0removal of duplicates.

<= div style=3D"font-size:11pt">Cheers, Fabian=C2=A0

From:=C2=A0Flavio Pompermaier
<= b>Sent:=C2=A0=E2=80=8EMonday=E2=80=8E, =E2=80=8E22=E2=80=8E. =E2=80=8ED= ecember=E2=80=8E, =E2=80=8E2014 =E2=80=8E14=E2=80=8E:=E2=80=8E32
To:<= /b>=C2=A0user@flink.incubator.apache.org

Ok thanks Fabian. I'= d like just to know the internals of the union of multiple datasets (partit= ioning, distribution among server, memory/disk, etc..). Do you have any ref= to this?

Thanks in advance,
Flavio

On Mon, Dec 22, 2014 a= t 12:46 PM, Fabian Hueske <fhueske@apache.org> wrote:
Follow the first approach.=C2=A0
Joins = are expensive, union comes for free.

Best, Fab= ian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <= ;pompermaier@okka= m.it>:
Hi guys,

In my use case I have multiple Datasets with the same structure (e.g. = Tuple3) and I want to produce an output Dataset containing all Tuple3 group= ed by the first field (0).
I can obtain the same results performi= ng a union of all datasets and then a group by (simplest implementation) or= join all of them pairwise (((A->B)->C)->D)..) or I don't know= if there is any other solution. When should I use the first or the second = approach? Could you help me in figuring out the internals of the two approa= ches? I always have some fear when using multiple joins when I don't kn= ow exactly their size..

Best,
Flavio


=

--089e0168130c52543805117ee040--