flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philippe Caparroy <philippe.capar...@orange.fr>
Subject Re: Using CustomPartitionerWrapper with KeyedStream
Date Fri, 12 Aug 2016 10:43:48 GMT
Hi Max,

Thanks for the answer.
I needed to ensure that in a parallel window operation (which relies on a KeyedStream) each
partition contains a single key, in the output stream of the window.
I can obtain this using a customPartitioner just after the window, but relying on the partitioner
of the keyedStream could avoid the later transformation.
I was just wondering if there was a particular reason to limit the partitioner of the KeyedStream
to a HashPartitioner.
I have no problems of bottleneck or performances anyway.

Best regards.
> Le 12 août 2016 à 12:06, Maximilian Michels <mxm@apache.org> a écrit :
> 
> Hi Philippe,
> 
> There is no particular reason other than hash partitioning is a
> sensible default for most users. It seems like this is rarely an
> issue. When the number of keys is close to the parallelism, having
> idle partitions is usually not a problem due to low data volume. I see
> that it could be a problem if you had multiple "hotspot" keys but then
> you will have a hard time to parallelize work load anyways.
> 
> Does this limitation really impact performance for you or is this
> question of theoretical nature? :) In any case, we could file an issue
> and allow other partitioners for keyed streams.
> 
> Best,
> Max
> 
> 
> On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy
> <philippe.caparroy@orange.fr> wrote:
>> Hi there,
>> 
>> It seems not possible to use some custom partitioner in the context of the
>> KeyedStream, without modifying the KeyedStream.
>> 
>> 
>> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner)
>> {
>> throw new UnsupportedOperationException("Cannot override partitioning for
>> KeyedStream.");
>> }
>> 
>> In some particular situations, such as when the keys number is close to the
>> partitions number, and small, using the
>> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
>> 
>> might results in collisions in the partition indexes (and hence empty
>> partitions) assigned by the HashPartitioner that is imposed to the
>> KeyedStream :
>> 
>> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY>
>> keySelector, TypeInformation<KEY> keyType) {
>> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
>> dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
>> this.keySelector = keySelector;
>> this.keyType = keyType;
>> }
>> 
>> due to the characteristics of the underlying (any) hash function :
>> 
>> returnArray[0] = MathUtils.murmurHash(key.hashCode()) %
>> numberOfOutputChannels;
>> 
>> Is there a particular reason to force the KeyedStream to use a
>> HashPartitioner?
>> 
>> Thanks in advance and best regards.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 


Mime
View raw message