Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DC50D200BE8 for ; Fri, 9 Dec 2016 05:06:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DAC38160B27; Fri, 9 Dec 2016 04:06:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 088CB160B1F for ; Fri, 9 Dec 2016 05:06:11 +0100 (CET) Received: (qmail 85402 invoked by uid 500); 9 Dec 2016 04:06:10 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 85393 invoked by uid 99); 9 Dec 2016 04:06:10 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Dec 2016 04:06:10 +0000 Received: from mail-io0-f172.google.com (mail-io0-f172.google.com [209.85.223.172]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 998351A046A for ; Fri, 9 Dec 2016 04:06:10 +0000 (UTC) Received: by mail-io0-f172.google.com with SMTP id p42so40501902ioo.1 for ; Thu, 08 Dec 2016 20:06:10 -0800 (PST) X-Gm-Message-State: AKaTC01755/63SVXU8LFE9BRcshsABmLXSjIgTXKq6/zBfKKC2oJ2H0VxkNWqm12qhUBKs9wJ6JOT0vSBIGZPA== X-Received: by 10.107.11.161 with SMTP id 33mr10867548iol.210.1481256369976; Thu, 08 Dec 2016 20:06:09 -0800 (PST) MIME-Version: 1.0 References: <71914956-77D8-406D-9FD4-F5E8FFF14587@fuze.com> <19A19F38-99F1-471C-A3CE-D424C540C48D@data-artisans.com> <1C982634-6A5E-48D6-AEE1-23446E463F6C@fuze.com> <58497ECA.4090704@apache.org> <584991AB.3080608@apache.org> In-Reply-To: <584991AB.3080608@apache.org> From: Aljoscha Krettek Date: Fri, 09 Dec 2016 04:05:59 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Parallelism and stateful mapping with Flink To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113e9d6cc0ffbb054331df99 archived-at: Fri, 09 Dec 2016 04:06:13 -0000 --001a113e9d6cc0ffbb054331df99 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I commented on the issue with a way that should work. On Fri, 9 Dec 2016 at 01:00 Chesnay Schepler wrote: > Done. https://issues.apache.org/jira/browse/FLINK-5299 > > On 08.12.2016 16:50, Ufuk Celebi wrote: > > Would you like to open an issue for this for starters Chesnay? Would be > good to fix for the upcoming release even. > > > > > > On 8 December 2016 at 16:39:58, Chesnay Schepler (chesnay@apache.org) > wrote: > >> It would be neat if we could support arrays as keys directly; it shoul= d > >> boil down to checking the key type and in case of an array injecting a > >> KeySelector that calls Arrays.hashCode(array). > >> This worked for me when i ran into the same issue while experimenting > >> with some stuff. > >> > >> The batch API can use arrays as keys as well, so it's also a matter of > >> consistency imo. > >> > >> Regards, > >> Chesnay > >> > >> On 08.12.2016 16:23, Ufuk Celebi wrote: > >>> @Aljoscha: I remember that someone else ran into this, too. Should we > address arrays > >> as keys specifically in the API? Prohibit? Document this? > >>> =E2=80=93 Ufuk > >>> > >>> On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com) > wrote: > >>>> Sure! > >>>> > >>>> (Aside, it turns out that the issue was using an `Array[Byte]` as a > key - byte arrays > >> don=E2=80=99t > >>>> appear to have a stable hashCode. I=E2=80=99ll provide the skeleton = for > fullness, though.) > >>>> > >>>> val env =3D StreamExecutionEnvironment.getExecutionEnvironment > >>>> env.setParallelism(Config.callAggregator.parallelism) > >>>> > >>>> env.addSource(kafkaSource) > >>>> .flatMap(transformToRecords(_)) > >>>> .keyBy(b =3D> new String(b.rowKey)) // rowKey is Array[Byte] > >>>> .map(new StatefulAggregator()) > >>>> .addSink(hbaseSink) > >>>> > >>>> > >>>> Again, wrapping my keyBy function in `new String()` has fixed my > issue. Thanks! > >>>> > >>>> -a > >>>> > >>>> > >>>> > >>>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> could you maybe provide the (minimal) code for the problematic job? > Also, are you > >> sure > >>>> that the keyBy is working on the correct key attribute? > >>>>> Best, > >>>>> Stefan > >>>>> > >>>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts : > >>>>>> > >>>>>> Hello, > >>>>>> > >>>>>> I=E2=80=99m trying to perform a stateful mapping of some objects c= oming in > from Kafka in a > >> parallelized > >>>> flink job (set on the job using env.setParallelism(3)). The data > source is a kafka > >> topic, > >>>> but the partitions aren=E2=80=99t meaningfully keyed for this operat= ion (each > kafka message > >>>> is flatMapped to between 0-2 objects, with potentially different > keys). I have a keyBy() > >>>> operator directly before my map(), but I=E2=80=99m seeing objects wi= th the > same key distributed > >>>> to different parallel task instances, as reported by > getRuntimeContext().getIndexOfThisSubtask(). > >>>>>> My understanding of keyBy is that it would segment the stream by > key, and guarantee > >>>> that all data with a given key would hit the same instance. Am I > possibly seeing residual > >>>> =E2=80=9Ckeying=E2=80=9D from the kafka topic? > >>>>>> I=E2=80=99m running flink 1.1.3 in scala. Please let me know if I = can add > more info. > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Andrew > >>>> > >> > >> > > > > --001a113e9d6cc0ffbb054331df99 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I commented on the issue with a way that should work.
On Fri, 9 Dec 2016 at 01:0= 0 Chesnay Schepler <chesnay@apache= .org> wrote:
Done. https://issues.apache.org/jira/browse/F= LINK-5299

On 08.12.2016 16:50, Ufuk Celebi wrote:
> Would you like to open an issue for this for starters Chesnay? Would b= e good to fix for the upcoming release even.
>
>
> On 8 December 2016 at 16:39:58, Chesnay Schepler (chesnay@apache.org<= /a>) wrote:
>> It would be neat if we could support arrays as keys directly; it s= hould
>> boil down to checking the key type and in case of an array injecti= ng a
>> KeySelector that calls Arrays.hashCode(array).
>> This worked for me when i ran into the same issue while experiment= ing
>> with some stuff.
>>
>> The batch API can use arrays as keys as well, so it's also a m= atter of
>> consistency imo.
>>
>> Regards,
>> Chesnay
>>
>> On 08.12.2016 16:23, Ufuk Celebi wrote:
>>> @Aljoscha: I remember that someone else ran into this, too. Sh= ould we address arrays
>> as keys specifically in the API? Prohibit? Document this?
>>> =E2=80=93 Ufuk
>>>
>>> On 7 December 2016 at 17:41:40, Andrew Roberts (
aroberts@fuze.= com) wrote:
>>>> Sure!
>>>>
>>>> (Aside, it turns out that the issue was using an `Array[By= te]` as a key - byte arrays
>> don=E2=80=99t
>>>> appear to have a stable hashCode. I=E2=80=99ll provide the= skeleton for fullness, though.)
>>>>
>>>> val env =3D StreamExecutionEnvironment.getExecutionEnviron= ment
>>>> env.setParallelism(Config.callAggregator.parallelism)
>>>>
>>>> env.addSource(kafkaSource)
>>>> .flatMap(transformToRecords(_))
>>>> .keyBy(b =3D> new String(b.rowKey)) // rowKey is Array[= Byte]
>>>> .map(new StatefulAggregator())
>>>> .addSink(hbaseSink)
>>>>
>>>>
>>>> Again, wrapping my keyBy function in `new String()` has fi= xed my issue. Thanks!
>>>>
>>>> -a
>>>>
>>>>
>>>>
>>>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> could you maybe provide the (minimal) code for the pro= blematic job? Also, are you
>> sure
>>>> that the keyBy is working on the correct key attribute? >>>>> Best,
>>>>> Stefan
>>>>>
>>>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts : >>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I=E2=80=99m trying to perform a stateful mapping o= f some objects coming in from Kafka in a
>> parallelized
>>>> flink job (set on the job using env.setParallelism(3)). Th= e data source is a kafka
>> topic,
>>>> but the partitions aren=E2=80=99t meaningfully keyed for t= his operation (each kafka message
>>>> is flatMapped to between 0-2 objects, with potentially dif= ferent keys). I have a keyBy()
>>>> operator directly before my map(), but I=E2=80=99m seeing = objects with the same key distributed
>>>> to different parallel task instances, as reported by getRu= ntimeContext().getIndexOfThisSubtask().
>>>>>> My understanding of keyBy is that it would segment= the stream by key, and guarantee
>>>> that all data with a given key would hit the same instance= . Am I possibly seeing residual
>>>> =E2=80=9Ckeying=E2=80=9D from the kafka topic?
>>>>>> I=E2=80=99m running flink 1.1.3 in scala. Please l= et me know if I can add more info.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Andrew
>>>>
>>
>>
>

--001a113e9d6cc0ffbb054331df99--