flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Parallelism and stateful mapping with Flink
Date Thu, 08 Dec 2016 15:39:54 GMT
It would be neat if we could support arrays as keys directly; it should 
boil down to checking the key type and in case of an array injecting a 
KeySelector that calls Arrays.hashCode(array).
This worked for me when i ran into the same issue while experimenting 
with some stuff.

The batch API can use arrays as keys as well, so it's also a matter of 
consistency imo.

Regards,
Chesnay

On 08.12.2016 16:23, Ufuk Celebi wrote:
> @Aljoscha: I remember that someone else ran into this, too. Should we address arrays
as keys specifically in the API? Prohibit? Document this?
>
> – Ufuk
>
> On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com) wrote:
>> Sure!
>>   
>> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays
don’t
>> appear to have a stable hashCode. I’ll provide the skeleton for fullness, though.)
>>   
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setParallelism(Config.callAggregator.parallelism)
>>   
>> env.addSource(kafkaSource)
>> .flatMap(transformToRecords(_))
>> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
>> .map(new StatefulAggregator())
>> .addSink(hbaseSink)
>>   
>>   
>> Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!
>>   
>> -a
>>   
>>   
>>   
>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
>>>
>>> Hi,
>>>
>>> could you maybe provide the (minimal) code for the problematic job? Also, are
you sure
>> that the keyBy is working on the correct key attribute?
>>> Best,
>>> Stefan
>>>
>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
>>>>
>>>> Hello,
>>>>
>>>> I’m trying to perform a stateful mapping of some objects coming in from
Kafka in a parallelized
>> flink job (set on the job using env.setParallelism(3)). The data source is a kafka
topic,
>> but the partitions aren’t meaningfully keyed for this operation (each kafka message
>> is flatMapped to between 0-2 objects, with potentially different keys). I have a
keyBy()
>> operator directly before my map(), but I’m seeing objects with the same key distributed
>> to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().
>>>> My understanding of keyBy is that it would segment the stream by key, and
guarantee
>> that all data with a given key would hit the same instance. Am I possibly seeing
residual
>> “keying” from the kafka topic?
>>>> I’m running flink 1.1.3 in scala. Please let me know if I can add more
info.
>>>>
>>>> Thanks,
>>>>
>>>> Andrew
>>   
>>   
>


Mime
View raw message