Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E614200C22 for ; Tue, 21 Feb 2017 11:39:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3CFCD160B68; Tue, 21 Feb 2017 10:39:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 62ABF160B3E for ; Tue, 21 Feb 2017 11:39:11 +0100 (CET) Received: (qmail 84784 invoked by uid 500); 21 Feb 2017 10:39:10 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 84772 invoked by uid 99); 21 Feb 2017 10:39:10 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Feb 2017 10:39:10 +0000 Received: from mail-it0-f48.google.com (mail-it0-f48.google.com [209.85.214.48]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 39EB71A0323 for ; Tue, 21 Feb 2017 10:39:10 +0000 (UTC) Received: by mail-it0-f48.google.com with SMTP id 203so102368487ith.0 for ; Tue, 21 Feb 2017 02:39:10 -0800 (PST) X-Gm-Message-State: AMke39lVxs+3g08LlRv6ZkzhtRz+Niwtn7PcQP2//z0jw93fhv5Tu+yojMirTKUPeRqQT72U8mD5J4t0P8wxsw== X-Received: by 10.36.198.133 with SMTP id j127mr27571935itg.72.1487673549561; Tue, 21 Feb 2017 02:39:09 -0800 (PST) MIME-Version: 1.0 References: <459DDB2D-C683-4D80-9E69-3E04B3EDE2DF@inria.fr> In-Reply-To: From: Aljoscha Krettek Date: Tue, 21 Feb 2017 10:38:59 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: KeyGroupRangeAssignment ? To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c07dd0676b8cd054907fd6a archived-at: Tue, 21 Feb 2017 10:39:12 -0000 --94eb2c07dd0676b8cd054907fd6a Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Ovidiu, what's the reason for wanting to make the parallelism equal to the number of keys? I think in general it's very hard to ensure that hashes even go to different key groups. It can always happen that all your keys (if you have so few of them) are assigned to the same parallel operator instance. Cheers, Aljoscha On Tue, 21 Feb 2017 at 10:53 Till Rohrmann wrote: > Hi Ovidiu, > > at the moment it is not possible to plugin a user defined hash function/k= ey > group assignment function. If you like, then you can file a JIRA issue to > add this functionality. > > The key group assignment in your example looks quite skewed. One question > concerning how you calculated it: Shouldn't the number of element in each > group sum up to 1024? this only works for the first case. What do the > numbers mean then? > > Cheers, > Till > > On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU < > ovidiu-cristian.marcu@inria.fr> wrote: > > > Hi, > > > > Thank you for clarifications (I am working with KeyedStream so a custom > > partitioner does not help). > > > > So I should set maxParallelism>=3Dparallelism and change my keys (from > > input.keyBy(0)) such that key group assignment works as expected), > > but I can=E2=80=99t modify these keys in order to make it work. > > > > The other option is to change Flink=E2=80=99s internals in order to eve= nly > > distribute keys (changing computeKeyGroupForKeyHash: is this enough?). > > What I was looking for was an api to change the way key group assignmen= t > > is done, but without changing Flink=E2=80=99s runtime. > > > > I think that the maxParallelism setting is not enough (it introduces th= is > > inefficient way of distributing data for processing when using > KeyedStream). > > Is it possible to expose somehow the key group assignment? > > > > This is how keys are distributed (1024 keys, key=3D1..1024; and groups = from > > 2 to 16 - equiv. parallelism that is number of slots): > > > > {0=3D517, 1=3D507} 2 > > {0=3D881, 1=3D809, 2=3D358} 3 > > {0=3D1139, 1=3D1048, 2=3D617, 3=3D268} 4 > > {0=3D1319, 1=3D1268, 2=3D829, 3=3D473, 4=3D207} 5 > > {0=3D1512, 1=3D1425, 2=3D1008, 3=3D644, 4=3D352, 5=3D179} 6 > > {0=3D1656, 1=3D1586, 2=3D1160, 3=3D781, 4=3D512, 5=3D310, 6=3D139} 7 > > {0=3D1781, 1=3D1718, 2=3D1280, 3=3D908, 4=3D645, 5=3D417, 6=3D278, 7=3D= 141} 8 > > {0=3D1901, 1=3D1828, 2=3D1395, 3=3D1031, 4=3D738, 5=3D529, 6=3D399, 7= =3D240, 8=3D131} 9 > > {0=3D1996, 1=3D1934, 2=3D1493, 3=3D1134, 4=3D846, 5=3D614, 6=3D513, 7= =3D354, 8=3D233, 9=3D99} > > 10 > > {0=3D2094, 1=3D2017, 2=3D1577, 3=3D1226, 4=3D935, 5=3D713, 6=3D610, 7= =3D434, 8=3D359, > 9=3D174, > > 10=3D101} 11 > > {0=3D2192, 1=3D2091, 2=3D1669, 3=3D1316, 4=3D1008, 5=3D797, 6=3D705, 7= =3D517, 8=3D446, > > 9=3D255, 10=3D173, 11=3D95} 12 > > {0=3D2257, 1=3D2175, 2=3D1741, 3=3D1396, 4=3D1079, 5=3D882, 6=3D785, 7= =3D596, 8=3D524, > > 9=3D340, 10=3D254, 11=3D186, 12=3D73} 13 > > {0=3D2330, 1=3D2258, 2=3D1816, 3=3D1458, 4=3D1160, 5=3D951, 6=3D858, 7= =3D667, 8=3D602, > > 9=3D417, 10=3D329, 11=3D265, 12=3D135, 13=3D66} 14 > > {0=3D2397, 1=3D2323, 2=3D1897, 3=3D1542, 4=3D1233, 5=3D1008, 6=3D934, 7= =3D723, 8=3D671, > > 9=3D479, 10=3D385, 11=3D344, 12=3D210, 13=3D118, 14=3D72} 15 > > {0=3D2454, 1=3D2395, 2=3D1949, 3=3D1603, 4=3D1296, 5=3D1055, 6=3D998, 7= =3D803, 8=3D739, > > 9=3D539, 10=3D453, 11=3D410, 12=3D280, 13=3D178, 14=3D147, 15=3D61} 16 > > > > Best, > > Ovidiu > > > > > On 20 Feb 2017, at 12:04, Till Rohrmann wrote: > > > > > > Hi Ovidiu, > > > > > > the way Flink works is to assign key group ranges to operators. For > each > > element you calculate a hash value and based on that you assign it to a > key > > group. Thus, in your example, you have either a key group with more tha= n > 1 > > key or multiple key groups with 1 or more keys assigned to an operator. > > > > > > So what you could try to do is to reduce the number of key groups to > > your parallelism via env.setMaxParallelism() and then try to figure a k= ey > > out whose hashes are uniformly distributed over the key groups. The key > > group assignment is calculated via murmurHash(key.hashCode()) % > > maxParallelism. > > > > > > Alternatively if you don=E2=80=99t need a keyed stream, you could try= to use a > > custom partitioner via DataStream.partitionCustom. > > > > > > Cheers, > > > Till > > > > > > > > > On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU < > > ovidiu-cristian.marcu@inria.fr > > > wrote: > > > Hi, > > > > > > Can you please comment on how can I ensure stream input records are > > distributed evenly onto task slots? > > > See attached screen Records received issue. > > > > > > I have a simple application which is applying some window function ov= er > > a stream partitioned as follows: > > > (parallelism is equal to the number of keys; records with the same ke= y > > are streamed evenly) > > > > > > // get the execution environment > > > final StreamExecutionEnvironment env =3D StreamExecutionEnvironment. > > getExecutionEnvironment(); > > > // get input data by connecting to the socket > > > DataStream text =3D env.socketTextStream("localhost", port, > "\n"); > > > DataStream Long, > > Long>> input =3D text.flatMap(...); > > > DataStream counts1 =3D null; > > > counts1 =3D input.keyBy(0).countWindow(windowSize, slideSize) > > > .apply(new WindowFunction > Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() { > > > ... > > > }); > > > counts1.writeAsText(params.get("output1")); > > > env.execute("Socket Window WordCount=E2=80=9D); > > > > > > Best, > > > Ovidiu > > > > > > > > > > > --94eb2c07dd0676b8cd054907fd6a--