flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Parallelism and stateful mapping with Flink
Date Thu, 08 Dec 2016 17:00:27 GMT
Done. https://issues.apache.org/jira/browse/FLINK-5299

On 08.12.2016 16:50, Ufuk Celebi wrote:
> Would you like to open an issue for this for starters Chesnay? Would be good to fix for
the upcoming release even.
>
>
> On 8 December 2016 at 16:39:58, Chesnay Schepler (chesnay@apache.org) wrote:
>> It would be neat if we could support arrays as keys directly; it should
>> boil down to checking the key type and in case of an array injecting a
>> KeySelector that calls Arrays.hashCode(array).
>> This worked for me when i ran into the same issue while experimenting
>> with some stuff.
>>   
>> The batch API can use arrays as keys as well, so it's also a matter of
>> consistency imo.
>>   
>> Regards,
>> Chesnay
>>   
>> On 08.12.2016 16:23, Ufuk Celebi wrote:
>>> @Aljoscha: I remember that someone else ran into this, too. Should we address
arrays
>> as keys specifically in the API? Prohibit? Document this?
>>> – Ufuk
>>>
>>> On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com) wrote:
>>>> Sure!
>>>>
>>>> (Aside, it turns out that the issue was using an `Array[Byte]` as a key -
byte arrays
>> don’t
>>>> appear to have a stable hashCode. I’ll provide the skeleton for fullness,
though.)
>>>>
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> env.setParallelism(Config.callAggregator.parallelism)
>>>>
>>>> env.addSource(kafkaSource)
>>>> .flatMap(transformToRecords(_))
>>>> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
>>>> .map(new StatefulAggregator())
>>>> .addSink(hbaseSink)
>>>>
>>>>
>>>> Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!
>>>>
>>>> -a
>>>>
>>>>
>>>>
>>>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> could you maybe provide the (minimal) code for the problematic job? Also,
are you
>> sure
>>>> that the keyBy is working on the correct key attribute?
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I’m trying to perform a stateful mapping of some objects coming
in from Kafka in a
>> parallelized
>>>> flink job (set on the job using env.setParallelism(3)). The data source is
a kafka
>> topic,
>>>> but the partitions aren’t meaningfully keyed for this operation (each kafka
message
>>>> is flatMapped to between 0-2 objects, with potentially different keys). I
have a keyBy()
>>>> operator directly before my map(), but I’m seeing objects with the same
key distributed
>>>> to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().
>>>>>> My understanding of keyBy is that it would segment the stream by
key, and guarantee
>>>> that all data with a given key would hit the same instance. Am I possibly
seeing residual
>>>> “keying” from the kafka topic?
>>>>>> I’m running flink 1.1.3 in scala. Please let me know if I can add
more info.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Andrew
>>>>
>>   
>>   
>


Mime
View raw message