flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject Re: PartitionByHash and usage of KeySelector
Date Mon, 10 Nov 2014 10:08:44 GMT
Thanks a lot Fabian. You clarified many points. Currently I am try to run
the job relying on a global index built with SOLR. It worked on a dataset
of about 1M record, but it failed with obscure exception on the one of
9.2M. If I cannot make it work, I will go back to the grouping approach.

Just a question. If I create a dataset for each group of a dataset, then I
could use the cross on each of the group. Right? However, I guess it would
be smarter to have a reduceGroup capable of generating just the pairs that
would need to be compared.

thanks a lot again. keep on the great work! :-)


2014-11-10 10:50 GMT+01:00 Fabian Hueske <fhueske@apache.org>:

> Hi Stefano,
> I'm not sure if we use the same terminology here. What you call
> partitioning might be called grouping in Flinks API / documentation.
> Grouping builds groups of element that share the same key. This is a
> deterministic operation.
> Partitioning distributes elements over a set of machines / parallel
> workers. If this is done using hash partitioning, Flink determines the
> parallel worker for an element by hashing the element's partition key (
> mod(hash(key), #workers) ). Consequently, all elements with the same
> partition key will be shipped to the same worker, BUT also all other
> elements for which mod(hash(key), #workers) is the same will be shipped to
> the same worker. If you partition map over these partitions all of these
> elements will be mixed. If the number of workers (or the hash function)
> changes, partitions will look different. When grouping all elements of the
> group will have the same key (and all elements with that key will be in the
> group).
> Flink's cross operator builds a dataset wide cross product. It does not
> respect groups (or partitions). If you want to build a cross product within
> a group, you can do that with a groupReduce which requires to hold all
> elements of the group in memory or manually spill them to disk in your UDF.
> Alternatively, you can use a self join (join a data set with itself) which
> will give you all pairs of the CP in individual function calls. However,
> Flink is currently not treating self joins special, such that the
> performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
> a-a, b-b, for two element a, b with the same join key).
> If it is possible to combine the marco-parameter keys and the
> minor-blocking keys into a single key, you could specify a key-selector
> function x() and either do
> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and apply
> expensive function to each pair of elements* ); or
> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric pair
> and apply expensive compare function* ).
> BTW. there was a similar use case a few days back on the mailing list.
> Might be worth reading that thread [1].
> Since there this is the second time that this issue came up, we might
> consider to add better support for group-wise cross operations.
> Cheers, Fabian
> [1]
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html

View raw message