Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 039E117459 for ; Mon, 10 Nov 2014 09:50:51 +0000 (UTC) Received: (qmail 23308 invoked by uid 500); 10 Nov 2014 09:50:50 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 23240 invoked by uid 500); 10 Nov 2014 09:50:50 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 23231 invoked by uid 99); 10 Nov 2014 09:50:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Nov 2014 09:50:50 +0000 X-ASF-Spam-Status: No, hits=-1997.1 required=5.0 tests=ALL_TRUSTED,HTML_MESSAGE,RP_MATCHES_RCVD,URI_HEX X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 10 Nov 2014 09:50:49 +0000 Received: (qmail 22781 invoked by uid 99); 10 Nov 2014 09:50:29 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Nov 2014 09:50:29 +0000 Received: from mail-yh0-f43.google.com (mail-yh0-f43.google.com [209.85.213.43]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 641AB1A02FC for ; Mon, 10 Nov 2014 09:49:36 +0000 (UTC) Received: by mail-yh0-f43.google.com with SMTP id a41so1909219yho.30 for ; Mon, 10 Nov 2014 01:50:28 -0800 (PST) MIME-Version: 1.0 X-Received: by 10.170.207.141 with SMTP id y135mr31606493yke.28.1415613028215; Mon, 10 Nov 2014 01:50:28 -0800 (PST) Received: by 10.170.139.70 with HTTP; Mon, 10 Nov 2014 01:50:28 -0800 (PST) In-Reply-To: References: Date: Mon, 10 Nov 2014 10:50:28 +0100 Message-ID: Subject: Re: PartitionByHash and usage of KeySelector From: Fabian Hueske To: "user@flink.incubator.apache.org" Content-Type: multipart/alternative; boundary=001a1139d682afb3fc05077e1784 X-Virus-Checked: Checked by ClamAV on apache.org --001a1139d682afb3fc05077e1784 Content-Type: text/plain; charset=UTF-8 Hi Stefano, I'm not sure if we use the same terminology here. What you call partitioning might be called grouping in Flinks API / documentation. Grouping builds groups of element that share the same key. This is a deterministic operation. Partitioning distributes elements over a set of machines / parallel workers. If this is done using hash partitioning, Flink determines the parallel worker for an element by hashing the element's partition key ( mod(hash(key), #workers) ). Consequently, all elements with the same partition key will be shipped to the same worker, BUT also all other elements for which mod(hash(key), #workers) is the same will be shipped to the same worker. If you partition map over these partitions all of these elements will be mixed. If the number of workers (or the hash function) changes, partitions will look different. When grouping all elements of the group will have the same key (and all elements with that key will be in the group). Flink's cross operator builds a dataset wide cross product. It does not respect groups (or partitions). If you want to build a cross product within a group, you can do that with a groupReduce which requires to hold all elements of the group in memory or manually spill them to disk in your UDF. Alternatively, you can use a self join (join a data set with itself) which will give you all pairs of the CP in individual function calls. However, Flink is currently not treating self joins special, such that the performance could be optimized. You'll also get symmetric pairs (a-b, b-a, a-a, b-b, for two element a, b with the same join key). If it is possible to combine the marco-parameter keys and the minor-blocking keys into a single key, you could specify a key-selector function x() and either do - dataSet.groupBy(x).reduceGroup( *read full group into memory, and apply expensive function to each pair of elements* ); or - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric pair and apply expensive compare function* ). BTW. there was a similar use case a few days back on the mailing list. Might be worth reading that thread [1]. Since there this is the second time that this issue came up, we might consider to add better support for group-wise cross operations. Cheers, Fabian [1] http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html --001a1139d682afb3fc05077e1784 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Stefano,

I'm not sure= if we use the same terminology here. What you call partitioning might be c= alled grouping in Flinks API / documentation.

Grou= ping builds groups of element that share the same key. This is a determinis= tic operation.
Partitioning distributes elements over a set of m= achines / parallel workers. If this is done using hash partitioning, Flink = determines the parallel worker for an element=C2=A0by hashing the element&#= 39;s partition key ( mod(hash(key), #workers) ). Consequently, all elements= with the same partition key will be shipped to the same worker, BUT also a= ll other elements for which mod(hash(key), #workers) is the same will be sh= ipped to the same worker. If you partition map over these partitions all of= these elements will be mixed. If the number of workers (or the hash functi= on) changes,=C2=A0partitions will look different.=C2=A0When grouping all el= ements of the group will have the same key (and all elements with that key = will be in the group).

Flink's cross operator = builds a dataset wide cross product. It does not respect groups (or partiti= ons). If you want to build a cross product within a group, you can do that = with a groupReduce which requires to hold all elements of the group in memo= ry or manually spill them to disk in your UDF. Alternatively, you can use a= self join (join a data set with itself) which will give you all pairs of t= he CP in individual function calls. However, Flink is currently not treatin= g self joins special, such that the performance could be optimized. You'= ;ll also get symmetric pairs (a-b, b-a, a-a, b-b, for two element a, b with= the same join key).

If it is possible to combine = the marco-parameter keys and the minor-blocking keys into a single key, you= could specify a key-selector function x() and either do
- dataSe= t.groupBy(x).reduceGroup( *read full group into memory, and apply expensive= function to each pair of elements* ); or
- dataSet.join(dataSet)= .where(x).equalTo(x).join( *check of symmetric pair and apply expensive com= pare function* ).

BTW. there was a similar use cas= e a few days back on the mailing list. Might be worth reading that thread [= 1].
Since there this is the second time that this issue came up, = we might consider to add better support for group-wise cross operations.

Cheers, Fabian

--001a1139d682afb3fc05077e1784--