flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Greg Hogan <c...@greghogan.com>
Subject Re: KeyGroupRangeAssignment ?
Date Tue, 21 Feb 2017 15:10:06 GMT
Integer's hashCode is the identity function. Store your slot index in an
Integer or IntValue and key off that field.

On Tue, Feb 21, 2017 at 6:04 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.marcu@inria.fr> wrote:

> Hi,
>
> As in my example, each key is a window so I want to evenly distributed
> processing to all slots.
> If I have 100 keys and 100 slots, for each key I have the same rate of
> events, I don’t want skewed distribution.
>
> Best,
> Ovidiu
>
> > On 21 Feb 2017, at 11:38, Aljoscha Krettek <aljoscha@apache.org> wrote:
> >
> > Hi Ovidiu,
> > what's the reason for wanting to make the parallelism equal to the number
> > of keys? I think in general it's very hard to ensure that hashes even go
> to
> > different key groups. It can always happen that all your keys (if you
> have
> > so few of them) are assigned to the same parallel operator instance.
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 21 Feb 2017 at 10:53 Till Rohrmann <trohrmann@apache.org> wrote:
> >
> >> Hi Ovidiu,
> >>
> >> at the moment it is not possible to plugin a user defined hash
> function/key
> >> group assignment function. If you like, then you can file a JIRA issue
> to
> >> add this functionality.
> >>
> >> The key group assignment in your example looks quite skewed. One
> question
> >> concerning how you calculated it: Shouldn't the number of element in
> each
> >> group sum up to 1024? this only works for the first case. What do the
> >> numbers mean then?
> >>
> >> Cheers,
> >> Till
> >>
> >> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> >> ovidiu-cristian.marcu@inria.fr> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thank you for clarifications (I am working with KeyedStream so a custom
> >>> partitioner does not help).
> >>>
> >>> So I should set maxParallelism>=parallelism and change my keys (from
> >>> input.keyBy(0)) such that key group assignment works as expected),
> >>> but I can’t modify these keys in order to make it work.
> >>>
> >>> The other option is to change Flink’s internals in order to evenly
> >>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
> >>> What I was looking for was an api to change the way key group
> assignment
> >>> is done, but without changing Flink’s runtime.
> >>>
> >>> I think that the maxParallelism setting is not enough (it introduces
> this
> >>> inefficient way of distributing data for processing when using
> >> KeyedStream).
> >>> Is it possible to expose somehow the key group assignment?
> >>>
> >>> This is how keys are distributed (1024 keys, key=1..1024; and groups
> from
> >>> 2 to 16 - equiv. parallelism that is number of slots):
> >>>
> >>> {0=517, 1=507} 2
> >>> {0=881, 1=809, 2=358} 3
> >>> {0=1139, 1=1048, 2=617, 3=268} 4
> >>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> >>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> >>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> >>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> >>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> >>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233,
> 9=99}
> >>> 10
> >>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
> >> 9=174,
> >>> 10=101} 11
> >>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
> >>> 9=255, 10=173, 11=95} 12
> >>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
> >>> 9=340, 10=254, 11=186, 12=73} 13
> >>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
> >>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
> >>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
> >>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
> >>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
> >>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> >>>
> >>> Best,
> >>> Ovidiu
> >>>
> >>>> On 20 Feb 2017, at 12:04, Till Rohrmann <trohrmann@apache.org>
wrote:
> >>>>
> >>>> Hi Ovidiu,
> >>>>
> >>>> the way Flink works is to assign key group ranges to operators. For
> >> each
> >>> element you calculate a hash value and based on that you assign it to a
> >> key
> >>> group. Thus, in your example, you have either a key group with more
> than
> >> 1
> >>> key or multiple key groups with 1 or more keys assigned to an operator.
> >>>>
> >>>> So what you could try to do is to reduce the number of key groups to
> >>> your parallelism via env.setMaxParallelism() and then try to figure a
> key
> >>> out whose hashes are uniformly distributed over the key groups. The key
> >>> group assignment is calculated via murmurHash(key.hashCode()) %
> >>> maxParallelism.
> >>>>
> >>>> Alternatively if you don’t need a keyed stream, you could try to use
a
> >>> custom partitioner via DataStream.partitionCustom.
> >>>>
> >>>> Cheers,
> >>>> Till
> >>>>
> >>>>
> >>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
> >>> ovidiu-cristian.marcu@inria.fr <mailto:ovidiu-cristian.marcu@inria.fr
> >>
> >>> wrote:
> >>>> Hi,
> >>>>
> >>>> Can you please comment on how can I ensure stream input records are
> >>> distributed evenly onto task slots?
> >>>> See attached screen Records received issue.
> >>>>
> >>>> I have a simple application which is applying some window function
> over
> >>> a stream partitioned as follows:
> >>>> (parallelism is equal to the number of keys; records with the same key
> >>> are streamed evenly)
> >>>>
> >>>> // get the execution environment
> >>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> >>> getExecutionEnvironment();
> >>>> // get input data by connecting to the socket
> >>>> DataStream<String> text = env.socketTextStream("localhost", port,
> >> "\n");
> >>>> DataStream<Tuple8<String, String, String, Integer, String, Double,
> >> Long,
> >>> Long>> input = text.flatMap(...);
> >>>> DataStream<Double> counts1 = null;
> >>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> >>>>              .apply(new WindowFunction<Tuple8<String, String,
String,
> >>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>()
{
> >>>>              ...
> >>>>              });
> >>>> counts1.writeAsText(params.get("output1"));
> >>>> env.execute("Socket Window WordCount”);
> >>>>
> >>>> Best,
> >>>> Ovidiu
> >>>>
> >>>>
> >>>
> >>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message