Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E82CA177A6 for ; Mon, 10 Nov 2014 11:06:17 +0000 (UTC) Received: (qmail 64138 invoked by uid 500); 10 Nov 2014 11:06:17 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 64074 invoked by uid 500); 10 Nov 2014 11:06:17 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 64065 invoked by uid 99); 10 Nov 2014 11:06:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Nov 2014 11:06:17 +0000 X-ASF-Spam-Status: No, hits=-1997.1 required=5.0 tests=ALL_TRUSTED,HTML_MESSAGE,RP_MATCHES_RCVD,URI_HEX X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 10 Nov 2014 11:05:54 +0000 Received: (qmail 63088 invoked by uid 99); 10 Nov 2014 11:05:52 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Nov 2014 11:05:52 +0000 Received: from mail-yh0-f49.google.com (mail-yh0-f49.google.com [209.85.213.49]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 545DF1A02FC for ; Mon, 10 Nov 2014 11:04:57 +0000 (UTC) Received: by mail-yh0-f49.google.com with SMTP id t59so4143078yho.8 for ; Mon, 10 Nov 2014 03:05:47 -0800 (PST) MIME-Version: 1.0 X-Received: by 10.236.65.233 with SMTP id f69mr28894412yhd.183.1415617547605; Mon, 10 Nov 2014 03:05:47 -0800 (PST) Received: by 10.170.139.70 with HTTP; Mon, 10 Nov 2014 03:05:47 -0800 (PST) In-Reply-To: References: Date: Mon, 10 Nov 2014 12:05:47 +0100 Message-ID: Subject: Re: PartitionByHash and usage of KeySelector From: Fabian Hueske To: "user@flink.incubator.apache.org" Content-Type: multipart/alternative; boundary=001a1132e71e103a1c05077f2549 X-Virus-Checked: Checked by ClamAV on apache.org --001a1132e71e103a1c05077f2549 Content-Type: text/plain; charset=UTF-8 Yes, if you'd split the data set manually (maybe using filter) into multiple data sets, you could use Cross. However, Cross is a binary operation, such that you'd need to use it as a self-cross which would result in symmetric pairs as the join. I'm not sure if I would do this in a single job, i.e., run all cross operations concurrently. It might be better to partition the data up-front and run multiple jobs for each group. Best, Fabian 2014-11-10 11:08 GMT+01:00 Stefano Bortoli : > Thanks a lot Fabian. You clarified many points. Currently I am try to run > the job relying on a global index built with SOLR. It worked on a dataset > of about 1M record, but it failed with obscure exception on the one of > 9.2M. If I cannot make it work, I will go back to the grouping approach. > > Just a question. If I create a dataset for each group of a dataset, then I > could use the cross on each of the group. Right? However, I guess it would > be smarter to have a reduceGroup capable of generating just the pairs that > would need to be compared. > > thanks a lot again. keep on the great work! :-) > > saluti, > Stefano > > > 2014-11-10 10:50 GMT+01:00 Fabian Hueske : > >> Hi Stefano, >> >> I'm not sure if we use the same terminology here. What you call >> partitioning might be called grouping in Flinks API / documentation. >> >> Grouping builds groups of element that share the same key. This is a >> deterministic operation. >> Partitioning distributes elements over a set of machines / parallel >> workers. If this is done using hash partitioning, Flink determines the >> parallel worker for an element by hashing the element's partition key ( >> mod(hash(key), #workers) ). Consequently, all elements with the same >> partition key will be shipped to the same worker, BUT also all other >> elements for which mod(hash(key), #workers) is the same will be shipped to >> the same worker. If you partition map over these partitions all of these >> elements will be mixed. If the number of workers (or the hash function) >> changes, partitions will look different. When grouping all elements of the >> group will have the same key (and all elements with that key will be in the >> group). >> >> Flink's cross operator builds a dataset wide cross product. It does not >> respect groups (or partitions). If you want to build a cross product within >> a group, you can do that with a groupReduce which requires to hold all >> elements of the group in memory or manually spill them to disk in your UDF. >> Alternatively, you can use a self join (join a data set with itself) which >> will give you all pairs of the CP in individual function calls. However, >> Flink is currently not treating self joins special, such that the >> performance could be optimized. You'll also get symmetric pairs (a-b, b-a, >> a-a, b-b, for two element a, b with the same join key). >> >> If it is possible to combine the marco-parameter keys and the >> minor-blocking keys into a single key, you could specify a key-selector >> function x() and either do >> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and apply >> expensive function to each pair of elements* ); or >> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric >> pair and apply expensive compare function* ). >> >> BTW. there was a similar use case a few days back on the mailing list. >> Might be worth reading that thread [1]. >> Since there this is the second time that this issue came up, we might >> consider to add better support for group-wise cross operations. >> >> Cheers, Fabian >> >> [1] >> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html >> >> >> > --001a1132e71e103a1c05077f2549 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Yes, if you'd=C2=A0split the data set manually (m= aybe using filter) into multiple data sets, you could use Cross.
However, Cross is a binary operation, such that you'd need to use it a= s a self-cross which would result in symmetric pairs as the join.

I'm not sure if I would do this in a single job, i.e., = run all cross operations concurrently.=C2=A0
It might be better t= o partition the data up-front and run multiple jobs for each group.

Best, Fabian

2014-11-10 11:08 GMT+01:00 Stefano Bortoli <s.b= ortoli@gmail.com>:
Thanks a lot Fabian. You clarified many points. Cur= rently I am try to run the job relying on a global index built with SOLR. I= t worked on a dataset of about 1M record, but it failed with obscure except= ion on the one of 9.2M. If I cannot make it work, I will go back to the gro= uping approach.

Just a question. If I create a dataset for eac= h group of a dataset, then I could use the cross on each of the group. Righ= t? However, I guess it would be smarter to have a reduceGroup capable of ge= nerating just the pairs that would need to be compared.

thanks= a lot again. keep on the great work! :-)

saluti,
Stefano


2014-11= -10 10:50 GMT+01:00 Fabian Hueske <fhueske@apache.org>:
=
Hi Stefano,

I'm not sure if we use the same terminology here. What you call partit= ioning might be called grouping in Flinks API / documentation.
Grouping builds groups of element that share the same key. Thi= s is a deterministic operation.
Partitioning distributes element= s over a set of machines / parallel workers. If this is done using hash par= titioning, Flink determines the parallel worker for an element=C2=A0by hash= ing the element's partition key ( mod(hash(key), #workers) ). Consequen= tly, all elements with the same partition key will be shipped to the same w= orker, BUT also all other elements for which mod(hash(key), #workers) is th= e same will be shipped to the same worker. If you partition map over these = partitions all of these elements will be mixed. If the number of workers (o= r the hash function) changes,=C2=A0partitions will look different.=C2=A0Whe= n grouping all elements of the group will have the same key (and all elemen= ts with that key will be in the group).

Flink'= s cross operator builds a dataset wide cross product. It does not respect g= roups (or partitions). If you want to build a cross product within a group,= you can do that with a groupReduce which requires to hold all elements of = the group in memory or manually spill them to disk in your UDF. Alternative= ly, you can use a self join (join a data set with itself) which will give y= ou all pairs of the CP in individual function calls. However, Flink is curr= ently not treating self joins special, such that the performance could be o= ptimized. You'll also get symmetric pairs (a-b, b-a, a-a, b-b, for two = element a, b with the same join key).

If it is pos= sible to combine the marco-parameter keys and the minor-blocking keys into = a single key, you could specify a key-selector function x() and either do
- dataSet.groupBy(x).reduceGroup( *read full group into memory, an= d apply expensive function to each pair of elements* ); or
- data= Set.join(dataSet).where(x).equalTo(x).join( *check of symmetric pair and ap= ply expensive compare function* ).

BTW. there was = a similar use case a few days back on the mailing list. Might be worth read= ing that thread [1].
Since there this is the second time that thi= s issue came up, we might consider to add better support for group-wise cro= ss operations.

Cheers, Fabian

=




--001a1132e71e103a1c05077f2549--